From 43bd23e5556ba947758cd23af0364ce808add059 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel@wavesofdawn.com>
Date: Tue, 27 Mar 2018 18:36:08 -0700
Subject: [PATCH] Add support for connecting to InfluxDB over a unix domain
 socket (#3942)

---
 plugins/outputs/influxdb/http.go      | 47 +++++++++++++---
 plugins/outputs/influxdb/http_test.go | 80 +++++++++++++++++++++++++++
 plugins/outputs/influxdb/influxdb.go  |  3 +-
 3 files changed, 122 insertions(+), 8 deletions(-)

diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go
index 852784f0..fdbb5bd8 100644
--- a/plugins/outputs/influxdb/http.go
+++ b/plugins/outputs/influxdb/http.go
@@ -8,6 +8,7 @@ import (
 	"fmt"
 	"io"
 	"log"
+	"net"
 	"net/http"
 	"net/url"
 	"path"
@@ -164,14 +165,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
 		config.Consistency)
 	queryURL := makeQueryURL(config.URL)
 
+	var transport *http.Transport
+	switch config.URL.Scheme {
+	case "http", "https":
+		transport = &http.Transport{
+			Proxy:           proxy,
+			TLSClientConfig: config.TLSConfig,
+		}
+	case "unix":
+		transport = &http.Transport{
+			Dial: func(_, _ string) (net.Conn, error) {
+				return net.DialTimeout(
+					config.URL.Scheme,
+					config.URL.Path,
+					defaultRequestTimeout,
+				)
+			},
+		}
+	default:
+		return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme)
+	}
+
 	client := &httpClient{
 		serializer: serializer,
 		client: &http.Client{
-			Timeout: timeout,
-			Transport: &http.Transport{
-				Proxy:           proxy,
-				TLSClientConfig: config.TLSConfig,
-			},
+			Timeout:   timeout,
+			Transport: transport,
 		},
 		database:        database,
 		url:             config.URL,
@@ -392,13 +411,27 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) string {
 	}
 
 	u := *loc
-	u.Path = path.Join(u.Path, "write")
+	switch u.Scheme {
+	case "unix":
+		u.Scheme = "http"
+		u.Host = "127.0.0.1"
+		u.Path = "/write"
+	case "http":
+		u.Path = path.Join(u.Path, "write")
+	}
 	u.RawQuery = params.Encode()
 	return u.String()
 }
 
 func makeQueryURL(loc *url.URL) string {
 	u := *loc
-	u.Path = path.Join(u.Path, "query")
+	switch u.Scheme {
+	case "unix":
+		u.Scheme = "http"
+		u.Host = "127.0.0.1"
+		u.Path = "/query"
+	case "http":
+		u.Path = path.Join(u.Path, "query")
+	}
 	return u.String()
 }
diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go
index d6463c05..e84957fc 100644
--- a/plugins/outputs/influxdb/http_test.go
+++ b/plugins/outputs/influxdb/http_test.go
@@ -7,9 +7,12 @@ import (
 	"fmt"
 	"io/ioutil"
 	"log"
+	"net"
 	"net/http"
 	"net/http/httptest"
 	"net/url"
+	"os"
+	"path"
 	"strings"
 	"testing"
 	"time"
@@ -556,3 +559,80 @@ func TestHTTP_WriteContentEncodingGzip(t *testing.T) {
 	err = client.Write(ctx, metrics)
 	require.NoError(t, err)
 }
+
+func TestHTTP_UnixSocket(t *testing.T) {
+	tmpdir, err := ioutil.TempDir("", "telegraf-test")
+	if err != nil {
+		require.NoError(t, err)
+	}
+	defer os.RemoveAll(tmpdir)
+
+	sock := path.Join(tmpdir, "test.sock")
+	listener, err := net.Listen("unix", sock)
+	require.NoError(t, err)
+
+	ts := httptest.NewUnstartedServer(http.NotFoundHandler())
+	ts.Listener = listener
+	ts.Start()
+	defer ts.Close()
+
+	x, _ := url.Parse("unix://" + sock)
+	fmt.Println(x)
+
+	successResponse := []byte(`{"results": [{"statement_id": 0}]}`)
+
+	tests := []struct {
+		name             string
+		config           *influxdb.HTTPConfig
+		database         string
+		queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
+		writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
+		errFunc          func(t *testing.T, err error)
+	}{
+		{
+			name: "success",
+			config: &influxdb.HTTPConfig{
+				URL:      &url.URL{Scheme: "unix", Path: sock},
+				Database: "xyzzy",
+			},
+			queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				require.Equal(t, `CREATE DATABASE "xyzzy"`, r.FormValue("q"))
+				w.WriteHeader(http.StatusOK)
+				w.Write(successResponse)
+			},
+			writeHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusNoContent)
+				w.Write(successResponse)
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				switch r.URL.Path {
+				case "/query":
+					tt.queryHandlerFunc(t, w, r)
+					return
+				case "/write":
+					tt.queryHandlerFunc(t, w, r)
+					return
+				default:
+					w.WriteHeader(http.StatusNotFound)
+					return
+				}
+			})
+
+			ctx := context.Background()
+
+			client, err := influxdb.NewHTTPClient(tt.config)
+			require.NoError(t, err)
+			err = client.CreateDatabase(ctx)
+			if tt.errFunc != nil {
+				tt.errFunc(t, err)
+			} else {
+				require.NoError(t, err)
+			}
+		})
+	}
+}
diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go
index 5f38263b..2e6e0d9c 100644
--- a/plugins/outputs/influxdb/influxdb.go
+++ b/plugins/outputs/influxdb/influxdb.go
@@ -70,6 +70,7 @@ var sampleConfig = `
   ##
   ## Multiple URLs can be specified for a single cluster, only ONE of the
   ## urls will be written to each interval.
+  # urls = ["unix:///var/run/influxdb.sock"]
   # urls = ["udp://127.0.0.1:8089"]
   # urls = ["http://127.0.0.1:8086"]
 
@@ -157,7 +158,7 @@ func (i *InfluxDB) Connect() error {
 			}
 
 			i.clients = append(i.clients, c)
-		case "http", "https":
+		case "http", "https", "unix":
 			c, err := i.httpClient(ctx, u, proxy)
 			if err != nil {
 				return err
-- 
GitLab