From 4a5d3136934bf6d25ea2c6fb0b8d0aedc13baa18 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Sun, 4 Dec 2016 20:18:13 +0000
Subject: [PATCH] Improve the InfluxDB through-put performance

This changes the current use of the InfluxDB client to instead use a
baked-in client that uses the fasthttp library.

This allows for significantly smaller allocations, the re-use of http
body buffers, and the re-use of the actual bytes of the line-protocol
metric representations.
---
 Godeps                                        |   2 +
 metric.go                                     |   9 +-
 metric/metric.go                              |  42 ++
 metric/reader.go                              | 155 ++++++
 metric/reader_test.go                         | 487 ++++++++++++++++++
 plugins/inputs/http_listener/http_listener.go |   3 +
 plugins/outputs/influxdb/client/client.go     |  22 +
 plugins/outputs/influxdb/client/http.go       | 258 ++++++++++
 plugins/outputs/influxdb/client/http_test.go  | 343 ++++++++++++
 plugins/outputs/influxdb/client/udp.go        |  99 ++++
 plugins/outputs/influxdb/client/udp_test.go   | 163 ++++++
 plugins/outputs/influxdb/influxdb.go          | 118 ++---
 plugins/outputs/influxdb/influxdb_test.go     | 111 +++-
 13 files changed, 1731 insertions(+), 81 deletions(-)
 create mode 100644 metric/reader.go
 create mode 100644 metric/reader_test.go
 create mode 100644 plugins/outputs/influxdb/client/client.go
 create mode 100644 plugins/outputs/influxdb/client/http.go
 create mode 100644 plugins/outputs/influxdb/client/http_test.go
 create mode 100644 plugins/outputs/influxdb/client/udp.go
 create mode 100644 plugins/outputs/influxdb/client/udp_test.go

diff --git a/Godeps b/Godeps
index 99606414..83b9e456 100644
--- a/Godeps
+++ b/Godeps
@@ -50,6 +50,8 @@ github.com/shirou/gopsutil 1516eb9ddc5e61ba58874047a98f8b44b5e585e8
 github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26
 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
 github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
+github.com/valyala/bytebufferpool e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7
+github.com/valyala/fasthttp 2f4876aaf2b591786efc9b49f34b86ad44c25074
 github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
 github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
diff --git a/metric.go b/metric.go
index cb230512..b1ab1b29 100644
--- a/metric.go
+++ b/metric.go
@@ -19,8 +19,15 @@ const (
 )
 
 type Metric interface {
+	// Serialize serializes the metric into a line-protocol byte buffer,
+	// including a newline at the end.
 	Serialize() []byte
-	String() string // convenience function for string(Serialize())
+	// same as Serialize, but avoids an allocation.
+	// returns number of bytes copied into dst.
+	SerializeTo(dst []byte) int
+	// String is the same as Serialize, but returns a string.
+	String() string
+	// Copy deep-copies the metric.
 	Copy() Metric
 	// Split will attempt to return multiple metrics with the same timestamp
 	// whose string representations are no longer than maxSize.
diff --git a/metric/metric.go b/metric/metric.go
index 8a18c0f2..4fbee0ad 100644
--- a/metric/metric.go
+++ b/metric/metric.go
@@ -178,6 +178,48 @@ func (m *metric) Serialize() []byte {
 	return tmp
 }
 
+func (m *metric) SerializeTo(dst []byte) int {
+	i := 0
+	if i >= len(dst) {
+		return i
+	}
+
+	i += copy(dst[i:], m.name)
+	if i >= len(dst) {
+		return i
+	}
+
+	i += copy(dst[i:], m.tags)
+	if i >= len(dst) {
+		return i
+	}
+
+	dst[i] = ' '
+	i++
+	if i >= len(dst) {
+		return i
+	}
+
+	i += copy(dst[i:], m.fields)
+	if i >= len(dst) {
+		return i
+	}
+
+	dst[i] = ' '
+	i++
+	if i >= len(dst) {
+		return i
+	}
+
+	i += copy(dst[i:], m.t)
+	if i >= len(dst) {
+		return i
+	}
+	dst[i] = '\n'
+
+	return i + 1
+}
+
 func (m *metric) Split(maxSize int) []telegraf.Metric {
 	if m.Len() < maxSize {
 		return []telegraf.Metric{m}
diff --git a/metric/reader.go b/metric/reader.go
new file mode 100644
index 00000000..df072996
--- /dev/null
+++ b/metric/reader.go
@@ -0,0 +1,155 @@
+package metric
+
+import (
+	"io"
+
+	"github.com/influxdata/telegraf"
+)
+
+type state int
+
+const (
+	_ state = iota
+	// normal state copies whole metrics into the given buffer until we can't
+	// fit the next metric.
+	normal
+	// split state means that we have a metric that we were able to split, so
+	// that we can fit it into multiple metrics (and calls to Read)
+	split
+	// overflow state means that we have a metric that didn't fit into a single
+	// buffer, and needs to be split across multiple calls to Read.
+	overflow
+	// splitOverflow state means that a split metric didn't fit into a single
+	// buffer, and needs to be split across multiple calls to Read.
+	splitOverflow
+	// done means we're done reading metrics, and now always return (0, io.EOF)
+	done
+)
+
+type reader struct {
+	metrics      []telegraf.Metric
+	splitMetrics []telegraf.Metric
+	buf          []byte
+	state        state
+
+	// metric index
+	iM int
+	// split metric index
+	iSM int
+	// buffer index
+	iB int
+}
+
+func NewReader(metrics []telegraf.Metric) io.Reader {
+	return &reader{
+		metrics: metrics,
+		state:   normal,
+	}
+}
+
+func (r *reader) Read(p []byte) (n int, err error) {
+	var i int
+	switch r.state {
+	case done:
+		return 0, io.EOF
+	case normal:
+		for {
+			// this for-loop is the sunny-day scenario, where we are given a
+			// buffer that is large enough to hold at least a single metric.
+			// all of the cases below it are edge-cases.
+			if r.metrics[r.iM].Len() < len(p[i:]) {
+				i += r.metrics[r.iM].SerializeTo(p[i:])
+			} else {
+				break
+			}
+			r.iM++
+			if r.iM == len(r.metrics) {
+				r.state = done
+				return i, io.EOF
+			}
+		}
+
+		// if we haven't written any bytes, check if we can split the current
+		// metric into multiple full metrics at a smaller size.
+		if i == 0 {
+			tmp := r.metrics[r.iM].Split(len(p))
+			if len(tmp) > 1 {
+				r.splitMetrics = tmp
+				r.state = split
+				if r.splitMetrics[0].Len() < len(p) {
+					i += r.splitMetrics[0].SerializeTo(p)
+					r.iSM = 1
+				} else {
+					// splitting didn't quite work, so we'll drop down and
+					// overflow the metric.
+					r.state = normal
+					r.iSM = 0
+				}
+			}
+		}
+
+		// if we haven't written any bytes and we're not at the end of the metrics
+		// slice, then it means we have a single metric that is larger than the
+		// provided buffer.
+		if i == 0 {
+			r.buf = r.metrics[r.iM].Serialize()
+			i += copy(p, r.buf[r.iB:])
+			r.iB += i
+			r.state = overflow
+		}
+
+	case split:
+		if r.splitMetrics[r.iSM].Len() < len(p) {
+			// write the current split metric
+			i += r.splitMetrics[r.iSM].SerializeTo(p)
+			r.iSM++
+			if r.iSM >= len(r.splitMetrics) {
+				// done writing the current split metrics
+				r.iSM = 0
+				r.iM++
+				if r.iM == len(r.metrics) {
+					r.state = done
+					return i, io.EOF
+				}
+				r.state = normal
+			}
+		} else {
+			// This would only happen if we split the metric, and then a
+			// subsequent buffer was smaller than the initial one given,
+			// so that our split metric no longer fits.
+			r.buf = r.splitMetrics[r.iSM].Serialize()
+			i += copy(p, r.buf[r.iB:])
+			r.iB += i
+			r.state = splitOverflow
+		}
+
+	case splitOverflow:
+		i = copy(p, r.buf[r.iB:])
+		r.iB += i
+		if r.iB >= len(r.buf) {
+			r.iB = 0
+			r.iSM++
+			if r.iSM == len(r.splitMetrics) {
+				r.iM++
+				r.state = normal
+			} else {
+				r.state = split
+			}
+		}
+
+	case overflow:
+		i = copy(p, r.buf[r.iB:])
+		r.iB += i
+		if r.iB >= len(r.buf) {
+			r.iB = 0
+			r.iM++
+			if r.iM == len(r.metrics) {
+				r.state = done
+				return i, io.EOF
+			}
+			r.state = normal
+		}
+	}
+
+	return i, nil
+}
diff --git a/metric/reader_test.go b/metric/reader_test.go
new file mode 100644
index 00000000..a1c864ad
--- /dev/null
+++ b/metric/reader_test.go
@@ -0,0 +1,487 @@
+package metric
+
+import (
+	"io"
+	"io/ioutil"
+	"regexp"
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func BenchmarkMetricReader(b *testing.B) {
+	metrics := make([]telegraf.Metric, 10)
+	for i := 0; i < 10; i++ {
+		metrics[i], _ = New("foo", map[string]string{},
+			map[string]interface{}{"value": int64(1)}, time.Now())
+	}
+	for n := 0; n < b.N; n++ {
+		r := NewReader(metrics)
+		io.Copy(ioutil.Discard, r)
+	}
+}
+
+func TestMetricReader(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	metrics := make([]telegraf.Metric, 10)
+	for i := 0; i < 10; i++ {
+		metrics[i], _ = New("foo", map[string]string{},
+			map[string]interface{}{"value": int64(1)}, ts)
+	}
+
+	r := NewReader(metrics)
+
+	buf := make([]byte, 35)
+	for i := 0; i < 10; i++ {
+		n, err := r.Read(buf)
+		if err != nil {
+			assert.True(t, err == io.EOF, err.Error())
+		}
+		assert.Equal(t, 33, n)
+		assert.Equal(t, "foo value=1i 1481032190000000000\n", string(buf[0:n]))
+	}
+
+	// reader should now be done, and always return 0, io.EOF
+	for i := 0; i < 10; i++ {
+		n, err := r.Read(buf)
+		assert.True(t, err == io.EOF, err.Error())
+		assert.Equal(t, 0, n)
+	}
+}
+
+func TestMetricReader_OverflowMetric(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	m, _ := New("foo", map[string]string{},
+		map[string]interface{}{"value": int64(10)}, ts)
+	metrics := []telegraf.Metric{m}
+
+	r := NewReader(metrics)
+	buf := make([]byte, 5)
+
+	tests := []struct {
+		exp string
+		err error
+		n   int
+	}{
+		{
+			"foo v",
+			nil,
+			5,
+		},
+		{
+			"alue=",
+			nil,
+			5,
+		},
+		{
+			"10i 1",
+			nil,
+			5,
+		},
+		{
+			"48103",
+			nil,
+			5,
+		},
+		{
+			"21900",
+			nil,
+			5,
+		},
+		{
+			"00000",
+			nil,
+			5,
+		},
+		{
+			"000\n",
+			io.EOF,
+			4,
+		},
+		{
+			"",
+			io.EOF,
+			0,
+		},
+	}
+
+	for _, test := range tests {
+		n, err := r.Read(buf)
+		assert.Equal(t, test.n, n)
+		assert.Equal(t, test.exp, string(buf[0:n]))
+		assert.Equal(t, test.err, err)
+	}
+}
+
+func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	m, _ := New("foo", map[string]string{},
+		map[string]interface{}{"value": int64(10)}, ts)
+	metrics := []telegraf.Metric{m, m.Copy()}
+
+	r := NewReader(metrics)
+	buf := make([]byte, 10)
+
+	tests := []struct {
+		exp string
+		err error
+		n   int
+	}{
+		{
+			"foo value=",
+			nil,
+			10,
+		},
+		{
+			"10i 148103",
+			nil,
+			10,
+		},
+		{
+			"2190000000",
+			nil,
+			10,
+		},
+		{
+			"000\n",
+			nil,
+			4,
+		},
+		{
+			"foo value=",
+			nil,
+			10,
+		},
+		{
+			"10i 148103",
+			nil,
+			10,
+		},
+		{
+			"2190000000",
+			nil,
+			10,
+		},
+		{
+			"000\n",
+			io.EOF,
+			4,
+		},
+		{
+			"",
+			io.EOF,
+			0,
+		},
+	}
+
+	for _, test := range tests {
+		n, err := r.Read(buf)
+		assert.Equal(t, test.n, n)
+		assert.Equal(t, test.exp, string(buf[0:n]))
+		assert.Equal(t, test.err, err)
+	}
+}
+
+// test splitting a metric
+func TestMetricReader_SplitMetric(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	m1, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+			"value2": int64(10),
+			"value3": int64(10),
+			"value4": int64(10),
+			"value5": int64(10),
+			"value6": int64(10),
+		},
+		ts,
+	)
+	metrics := []telegraf.Metric{m1}
+
+	r := NewReader(metrics)
+	buf := make([]byte, 60)
+
+	tests := []struct {
+		expRegex string
+		err      error
+		n        int
+	}{
+		{
+			`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
+			nil,
+			57,
+		},
+		{
+			`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
+			io.EOF,
+			57,
+		},
+		{
+			"",
+			io.EOF,
+			0,
+		},
+	}
+
+	for _, test := range tests {
+		n, err := r.Read(buf)
+		assert.Equal(t, test.n, n)
+		re := regexp.MustCompile(test.expRegex)
+		assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n]))
+		assert.Equal(t, test.err, err)
+	}
+}
+
+// test an array with one split metric and one unsplit
+func TestMetricReader_SplitMetric2(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	m1, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+			"value2": int64(10),
+			"value3": int64(10),
+			"value4": int64(10),
+			"value5": int64(10),
+			"value6": int64(10),
+		},
+		ts,
+	)
+	m2, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+		},
+		ts,
+	)
+	metrics := []telegraf.Metric{m1, m2}
+
+	r := NewReader(metrics)
+	buf := make([]byte, 60)
+
+	tests := []struct {
+		expRegex string
+		err      error
+		n        int
+	}{
+		{
+			`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
+			nil,
+			57,
+		},
+		{
+			`foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`,
+			nil,
+			57,
+		},
+		{
+			`foo value1=10i 1481032190000000000\n`,
+			io.EOF,
+			35,
+		},
+		{
+			"",
+			io.EOF,
+			0,
+		},
+	}
+
+	for _, test := range tests {
+		n, err := r.Read(buf)
+		assert.Equal(t, test.n, n)
+		re := regexp.MustCompile(test.expRegex)
+		assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n]))
+		assert.Equal(t, test.err, err)
+	}
+}
+
+// test split that results in metrics that are still too long, which results in
+// the reader falling back to regular overflow.
+func TestMetricReader_SplitMetricTooLong(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	m1, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+			"value2": int64(10),
+		},
+		ts,
+	)
+	metrics := []telegraf.Metric{m1}
+
+	r := NewReader(metrics)
+	buf := make([]byte, 30)
+
+	tests := []struct {
+		expRegex string
+		err      error
+		n        int
+	}{
+		{
+			`foo value\d=10i,value\d=10i 1481`,
+			nil,
+			30,
+		},
+		{
+			`032190000000000\n`,
+			io.EOF,
+			16,
+		},
+		{
+			"",
+			io.EOF,
+			0,
+		},
+	}
+
+	for _, test := range tests {
+		n, err := r.Read(buf)
+		assert.Equal(t, test.n, n)
+		re := regexp.MustCompile(test.expRegex)
+		assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n]))
+		assert.Equal(t, test.err, err)
+	}
+}
+
+// test split with a changing buffer size in the middle of subsequent calls
+// to Read
+func TestMetricReader_SplitMetricChangingBuffer(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	m1, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+			"value2": int64(10),
+			"value3": int64(10),
+		},
+		ts,
+	)
+	m2, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+		},
+		ts,
+	)
+	metrics := []telegraf.Metric{m1, m2}
+
+	r := NewReader(metrics)
+
+	tests := []struct {
+		expRegex string
+		err      error
+		n        int
+		buf      []byte
+	}{
+		{
+			`foo value\d=10i 1481032190000000000\n`,
+			nil,
+			35,
+			make([]byte, 36),
+		},
+		{
+			`foo value\d=10i 148103219000000`,
+			nil,
+			30,
+			make([]byte, 30),
+		},
+		{
+			`0000\n`,
+			nil,
+			5,
+			make([]byte, 30),
+		},
+		{
+			`foo value\d=10i 1481032190000000000\n`,
+			nil,
+			35,
+			make([]byte, 36),
+		},
+		{
+			`foo value1=10i 1481032190000000000\n`,
+			io.EOF,
+			35,
+			make([]byte, 36),
+		},
+		{
+			"",
+			io.EOF,
+			0,
+			make([]byte, 36),
+		},
+	}
+
+	for _, test := range tests {
+		n, err := r.Read(test.buf)
+		assert.Equal(t, test.n, n, test.expRegex)
+		re := regexp.MustCompile(test.expRegex)
+		assert.True(t, re.MatchString(string(test.buf[0:n])), string(test.buf[0:n]))
+		assert.Equal(t, test.err, err, test.expRegex)
+	}
+}
+
+// test split with a changing buffer size in the middle of subsequent calls
+// to Read
+func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
+	ts := time.Unix(1481032190, 0)
+	m1, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+			"value2": int64(10),
+		},
+		ts,
+	)
+	m2, _ := New("foo", map[string]string{},
+		map[string]interface{}{
+			"value1": int64(10),
+		},
+		ts,
+	)
+	metrics := []telegraf.Metric{m1, m2}
+
+	r := NewReader(metrics)
+
+	tests := []struct {
+		expRegex string
+		err      error
+		n        int
+		buf      []byte
+	}{
+		{
+			`foo value\d=10i 1481032190000000000\n`,
+			nil,
+			35,
+			make([]byte, 36),
+		},
+		{
+			`foo value\d=10i 148103219000000`,
+			nil,
+			30,
+			make([]byte, 30),
+		},
+		{
+			`0000\n`,
+			nil,
+			5,
+			make([]byte, 30),
+		},
+		{
+			`foo value1=10i 1481032190000000000\n`,
+			io.EOF,
+			35,
+			make([]byte, 36),
+		},
+		{
+			"",
+			io.EOF,
+			0,
+			make([]byte, 36),
+		},
+	}
+
+	for _, test := range tests {
+		n, err := r.Read(test.buf)
+		assert.Equal(t, test.n, n, test.expRegex)
+		re := regexp.MustCompile(test.expRegex)
+		assert.True(t, re.MatchString(string(test.buf[0:n])), string(test.buf[0:n]))
+		assert.Equal(t, test.err, err, test.expRegex)
+	}
+}
diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go
index 0f426f80..05551a96 100644
--- a/plugins/inputs/http_listener/http_listener.go
+++ b/plugins/inputs/http_listener/http_listener.go
@@ -300,6 +300,9 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
 }
 
 func (h *HTTPListener) parse(b []byte, t time.Time) error {
+	if !bytes.HasSuffix(b, []byte("\n")) {
+		b = append(b, '\n')
+	}
 	metrics, err := h.parser.ParseWithDefaultTime(b, t)
 
 	for _, m := range metrics {
diff --git a/plugins/outputs/influxdb/client/client.go b/plugins/outputs/influxdb/client/client.go
new file mode 100644
index 00000000..3f52752a
--- /dev/null
+++ b/plugins/outputs/influxdb/client/client.go
@@ -0,0 +1,22 @@
+package client
+
+import "io"
+
+type Client interface {
+	Query(command string) error
+
+	Write(b []byte) (int, error)
+	WriteWithParams(b []byte, params WriteParams) (int, error)
+
+	WriteStream(b io.Reader, contentLength int) (int, error)
+	WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
+
+	Close() error
+}
+
+type WriteParams struct {
+	Database        string
+	RetentionPolicy string
+	Precision       string
+	Consistency     string
+}
diff --git a/plugins/outputs/influxdb/client/http.go b/plugins/outputs/influxdb/client/http.go
new file mode 100644
index 00000000..68cc3e09
--- /dev/null
+++ b/plugins/outputs/influxdb/client/http.go
@@ -0,0 +1,258 @@
+package client
+
+import (
+	"crypto/tls"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/url"
+	"time"
+
+	"github.com/valyala/fasthttp"
+)
+
+var (
+	defaultRequestTimeout = time.Second * 5
+)
+
+//
+func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
+	// validate required parameters:
+	if len(config.URL) == 0 {
+		return nil, fmt.Errorf("config.URL is required to create an HTTP client")
+	}
+	if len(defaultWP.Database) == 0 {
+		return nil, fmt.Errorf("A default database is required to create an HTTP client")
+	}
+
+	// set defaults:
+	if config.Timeout == 0 {
+		config.Timeout = defaultRequestTimeout
+	}
+
+	// parse URL:
+	u, err := url.Parse(config.URL)
+	if err != nil {
+		return nil, fmt.Errorf("error parsing config.URL: %s", err)
+	}
+	if u.Scheme != "http" && u.Scheme != "https" {
+		return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme)
+	}
+
+	wu := writeURL(u, defaultWP)
+	return &httpClient{
+		writeURL: []byte(wu),
+		config:   config,
+		url:      u,
+		client: &fasthttp.Client{
+			TLSConfig: config.TLSConfig,
+		},
+	}, nil
+}
+
+type HTTPConfig struct {
+	// URL should be of the form "http://host:port" (REQUIRED)
+	URL string
+
+	// UserAgent sets the User-Agent header.
+	UserAgent string
+
+	// Timeout is the time to wait for a response to each HTTP request (writes
+	// and queries).
+	Timeout time.Duration
+
+	// Username is the basic auth username for the server.
+	Username string
+	// Password is the basic auth password for the server.
+	Password string
+
+	// TLSConfig is the tls auth settings to use for each request.
+	TLSConfig *tls.Config
+
+	// Gzip, if true, compresses each payload using gzip.
+	// TODO
+	// Gzip bool
+}
+
+// Response represents a list of statement results.
+type Response struct {
+	// ignore Results:
+	Results []interface{} `json:"-"`
+	Err     string        `json:"error,omitempty"`
+}
+
+// Error returns the first error from any statement.
+// Returns nil if no errors occurred on any statements.
+func (r *Response) Error() error {
+	if r.Err != "" {
+		return fmt.Errorf(r.Err)
+	}
+	return nil
+}
+
+type httpClient struct {
+	writeURL []byte
+	config   HTTPConfig
+	client   *fasthttp.Client
+	url      *url.URL
+}
+
+func (c *httpClient) Query(command string) error {
+	req := c.makeRequest()
+	req.Header.SetRequestURI(queryURL(c.url, command))
+
+	return c.doRequest(req, fasthttp.StatusOK)
+}
+
+func (c *httpClient) Write(b []byte) (int, error) {
+	req := c.makeWriteRequest(len(b), c.writeURL)
+	req.SetBody(b)
+
+	err := c.doRequest(req, fasthttp.StatusNoContent)
+	if err == nil {
+		return len(b), nil
+	}
+	return 0, err
+}
+
+func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
+	req := c.makeWriteRequest(len(b), []byte(writeURL(c.url, wp)))
+	req.SetBody(b)
+
+	err := c.doRequest(req, fasthttp.StatusNoContent)
+	if err == nil {
+		return len(b), nil
+	}
+	return 0, err
+}
+
+func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
+	req := c.makeWriteRequest(contentLength, c.writeURL)
+	req.SetBodyStream(r, contentLength)
+
+	err := c.doRequest(req, fasthttp.StatusNoContent)
+	if err == nil {
+		return contentLength, nil
+	}
+	return 0, err
+}
+
+func (c *httpClient) WriteStreamWithParams(
+	r io.Reader,
+	contentLength int,
+	wp WriteParams,
+) (int, error) {
+	req := c.makeWriteRequest(contentLength, []byte(writeURL(c.url, wp)))
+	req.SetBodyStream(r, contentLength)
+
+	err := c.doRequest(req, fasthttp.StatusNoContent)
+	if err == nil {
+		return contentLength, nil
+	}
+	return 0, err
+}
+
+func (c *httpClient) doRequest(
+	req *fasthttp.Request,
+	expectedCode int,
+) error {
+	resp := fasthttp.AcquireResponse()
+
+	err := c.client.DoTimeout(req, resp, c.config.Timeout)
+
+	code := resp.StatusCode()
+	// If it's a "no content" response, then release and return nil
+	if code == fasthttp.StatusNoContent {
+		fasthttp.ReleaseResponse(resp)
+		fasthttp.ReleaseRequest(req)
+		return nil
+	}
+
+	// not a "no content" response, so parse the result:
+	var response Response
+	decErr := json.Unmarshal(resp.Body(), &response)
+
+	// If we got a JSON decode error, send that back
+	if decErr != nil {
+		err = fmt.Errorf("Unable to decode json: received status code %d err: %s", code, decErr)
+	}
+	// Unexpected response code OR error in JSON response body overrides
+	// a JSON decode error:
+	if code != expectedCode || response.Error() != nil {
+		err = fmt.Errorf("Response Error: Status Code [%d], expected [%d], [%v]",
+			code, expectedCode, response.Error())
+	}
+
+	fasthttp.ReleaseResponse(resp)
+	fasthttp.ReleaseRequest(req)
+
+	return err
+}
+
+func (c *httpClient) makeWriteRequest(
+	contentLength int,
+	writeURL []byte,
+) *fasthttp.Request {
+	req := c.makeRequest()
+	req.Header.SetContentLength(contentLength)
+	req.Header.SetRequestURIBytes(writeURL)
+	// TODO
+	// if gzip {
+	// 	req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip"))
+	// }
+	return req
+}
+
+func (c *httpClient) makeRequest() *fasthttp.Request {
+	req := fasthttp.AcquireRequest()
+	req.Header.SetContentTypeBytes([]byte("text/plain"))
+	req.Header.SetMethodBytes([]byte("POST"))
+	req.Header.SetUserAgent(c.config.UserAgent)
+	if c.config.Username != "" && c.config.Password != "" {
+		req.Header.Set("Authorization", "Basic "+basicAuth(c.config.Username, c.config.Password))
+	}
+	return req
+}
+
+func (c *httpClient) Close() error {
+	// Nothing to do.
+	return nil
+}
+
+func writeURL(u *url.URL, wp WriteParams) string {
+	params := url.Values{}
+	params.Set("db", wp.Database)
+	if wp.RetentionPolicy != "" {
+		params.Set("rp", wp.RetentionPolicy)
+	}
+	if wp.Precision != "n" && wp.Precision != "" {
+		params.Set("precision", wp.Precision)
+	}
+	if wp.Consistency != "one" && wp.Consistency != "" {
+		params.Set("consistency", wp.Consistency)
+	}
+
+	u.RawQuery = params.Encode()
+	u.Path = "write"
+	return u.String()
+}
+
+func queryURL(u *url.URL, command string) string {
+	params := url.Values{}
+	params.Set("q", command)
+
+	u.RawQuery = params.Encode()
+	u.Path = "query"
+	return u.String()
+}
+
+// See 2 (end of page 4) http://www.ietf.org/rfc/rfc2617.txt
+// "To receive authorization, the httpClient sends the userid and password,
+// separated by a single colon (":") character, within a base64
+// encoded string in the credentials."
+// It is not meant to be urlencoded.
+func basicAuth(username, password string) string {
+	auth := username + ":" + password
+	return base64.StdEncoding.EncodeToString([]byte(auth))
+}
diff --git a/plugins/outputs/influxdb/client/http_test.go b/plugins/outputs/influxdb/client/http_test.go
new file mode 100644
index 00000000..8fa02dd2
--- /dev/null
+++ b/plugins/outputs/influxdb/client/http_test.go
@@ -0,0 +1,343 @@
+package client
+
+import (
+	"bytes"
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestHTTPClient_Write(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/write":
+			// test form values:
+			if r.FormValue("db") != "test" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
+			}
+			if r.FormValue("rp") != "policy" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"wrong rp name"}`)
+			}
+			if r.FormValue("precision") != "ns" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"wrong precision"}`)
+			}
+			if r.FormValue("consistency") != "all" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"wrong consistency"}`)
+			}
+			// test that user agent is set properly
+			if r.UserAgent() != "test-agent" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"wrong agent name"}`)
+			}
+			// test basic auth params
+			user, pass, ok := r.BasicAuth()
+			if !ok {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"basic auth not set"}`)
+			}
+			if user != "test-user" || pass != "test-password" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"basic auth incorrect"}`)
+			}
+
+			// Validate Content-Length Header
+			if r.ContentLength != 13 {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				msg := fmt.Sprintf(`{"results":[{}],"error":"Content-Length: expected [13], got [%d]"}`, r.ContentLength)
+				fmt.Fprintln(w, msg)
+			}
+
+			// Validate the request body:
+			buf := make([]byte, 100)
+			n, _ := r.Body.Read(buf)
+			expected := "cpu value=99"
+			got := string(buf[0 : n-1])
+			if expected != got {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
+				fmt.Fprintln(w, msg)
+			}
+
+			w.WriteHeader(http.StatusNoContent)
+			w.Header().Set("Content-Type", "application/json")
+		case "/query":
+			w.WriteHeader(http.StatusOK)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}]}`)
+		}
+	}))
+	defer ts.Close()
+
+	config := HTTPConfig{
+		URL:       ts.URL,
+		UserAgent: "test-agent",
+		Username:  "test-user",
+		Password:  "test-password",
+	}
+	wp := WriteParams{
+		Database:        "test",
+		RetentionPolicy: "policy",
+		Precision:       "ns",
+		Consistency:     "all",
+	}
+	client, err := NewHTTP(config, wp)
+	defer client.Close()
+	assert.NoError(t, err)
+	n, err := client.Write([]byte("cpu value=99\n"))
+	assert.Equal(t, 13, n)
+	assert.NoError(t, err)
+
+	_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
+	assert.NoError(t, err)
+}
+
+func TestHTTPClient_WriteParamsOverride(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/write":
+			// test that database is set properly
+			if r.FormValue("db") != "override" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
+			}
+
+			// Validate the request body:
+			buf := make([]byte, 100)
+			n, _ := r.Body.Read(buf)
+			expected := "cpu value=99"
+			got := string(buf[0 : n-1])
+			if expected != got {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
+				fmt.Fprintln(w, msg)
+			}
+
+			w.WriteHeader(http.StatusNoContent)
+			w.Header().Set("Content-Type", "application/json")
+		case "/query":
+			w.WriteHeader(http.StatusOK)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}]}`)
+		}
+	}))
+	defer ts.Close()
+
+	config := HTTPConfig{
+		URL: ts.URL,
+	}
+	defaultWP := WriteParams{
+		Database: "test",
+	}
+	client, err := NewHTTP(config, defaultWP)
+	defer client.Close()
+	assert.NoError(t, err)
+
+	// test that WriteWithParams overrides the default write params
+	wp := WriteParams{
+		Database: "override",
+	}
+	n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
+	assert.Equal(t, 13, n)
+	assert.NoError(t, err)
+
+	_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
+	assert.NoError(t, err)
+}
+
+func TestHTTPClient_Write_Errors(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/write":
+			w.WriteHeader(http.StatusTeapot)
+		case "/query":
+			w.WriteHeader(http.StatusOK)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}]}`)
+		}
+	}))
+	defer ts.Close()
+
+	config := HTTPConfig{
+		URL: ts.URL,
+	}
+	defaultWP := WriteParams{
+		Database: "test",
+	}
+	client, err := NewHTTP(config, defaultWP)
+	defer client.Close()
+	assert.NoError(t, err)
+
+	lp := []byte("cpu value=99\n")
+	n, err := client.Write(lp)
+	assert.Equal(t, 0, n)
+	assert.Error(t, err)
+
+	n, err = client.WriteStream(bytes.NewReader(lp), 13)
+	assert.Equal(t, 0, n)
+	assert.Error(t, err)
+
+	wp := WriteParams{
+		Database: "override",
+	}
+	n, err = client.WriteWithParams(lp, wp)
+	assert.Equal(t, 0, n)
+	assert.Error(t, err)
+
+	n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
+	assert.Equal(t, 0, n)
+	assert.Error(t, err)
+}
+
+func TestNewHTTPErrors(t *testing.T) {
+	// No URL:
+	config := HTTPConfig{}
+	defaultWP := WriteParams{
+		Database: "test",
+	}
+	client, err := NewHTTP(config, defaultWP)
+	assert.Error(t, err)
+	assert.Nil(t, client)
+
+	// No Database:
+	config = HTTPConfig{
+		URL: "http://localhost:8086",
+	}
+	defaultWP = WriteParams{}
+	client, err = NewHTTP(config, defaultWP)
+	assert.Nil(t, client)
+	assert.Error(t, err)
+
+	// Invalid URL:
+	config = HTTPConfig{
+		URL: "http://192.168.0.%31:8080/",
+	}
+	defaultWP = WriteParams{
+		Database: "test",
+	}
+	client, err = NewHTTP(config, defaultWP)
+	assert.Nil(t, client)
+	assert.Error(t, err)
+
+	// Invalid URL scheme:
+	config = HTTPConfig{
+		URL: "mailto://localhost:8086",
+	}
+	defaultWP = WriteParams{
+		Database: "test",
+	}
+	client, err = NewHTTP(config, defaultWP)
+	assert.Nil(t, client)
+	assert.Error(t, err)
+}
+
+func TestHTTPClient_Query(t *testing.T) {
+	command := "CREATE DATABASE test"
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/write":
+			w.WriteHeader(http.StatusNoContent)
+		case "/query":
+			// validate the create database command is correct
+			got := r.FormValue("q")
+			if got != command {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+				msg := fmt.Sprintf(`{"results":[{}],"error":"got %s, expected %s"}`, got, command)
+				fmt.Fprintln(w, msg)
+			}
+
+			w.WriteHeader(http.StatusOK)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}]}`)
+		}
+	}))
+	defer ts.Close()
+
+	config := HTTPConfig{
+		URL: ts.URL,
+	}
+	defaultWP := WriteParams{
+		Database: "test",
+	}
+	client, err := NewHTTP(config, defaultWP)
+	defer client.Close()
+	assert.NoError(t, err)
+	err = client.Query(command)
+	assert.NoError(t, err)
+}
+
+func TestHTTPClient_Query_ResponseError(t *testing.T) {
+	command := "CREATE DATABASE test"
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/write":
+			w.WriteHeader(http.StatusNoContent)
+		case "/query":
+			w.WriteHeader(http.StatusTeapot)
+			w.Header().Set("Content-Type", "application/json")
+			msg := fmt.Sprintf(`{"results":[{}],"error":"couldnt create database"}`)
+			fmt.Fprintln(w, msg)
+		}
+	}))
+	defer ts.Close()
+
+	config := HTTPConfig{
+		URL: ts.URL,
+	}
+	defaultWP := WriteParams{
+		Database: "test",
+	}
+	client, err := NewHTTP(config, defaultWP)
+	defer client.Close()
+	assert.NoError(t, err)
+	err = client.Query(command)
+	assert.Error(t, err)
+}
+
+func TestHTTPClient_Query_JSONDecodeError(t *testing.T) {
+	command := "CREATE DATABASE test"
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/write":
+			w.WriteHeader(http.StatusNoContent)
+		case "/query":
+			w.WriteHeader(http.StatusOK)
+			w.Header().Set("Content-Type", "application/json")
+			// write JSON missing a ']'
+			msg := fmt.Sprintf(`{"results":[{}}`)
+			fmt.Fprintln(w, msg)
+		}
+	}))
+	defer ts.Close()
+
+	config := HTTPConfig{
+		URL: ts.URL,
+	}
+	defaultWP := WriteParams{
+		Database: "test",
+	}
+	client, err := NewHTTP(config, defaultWP)
+	defer client.Close()
+	assert.NoError(t, err)
+	err = client.Query(command)
+	assert.Error(t, err)
+	assert.Contains(t, err.Error(), "json")
+}
diff --git a/plugins/outputs/influxdb/client/udp.go b/plugins/outputs/influxdb/client/udp.go
new file mode 100644
index 00000000..d542ecf6
--- /dev/null
+++ b/plugins/outputs/influxdb/client/udp.go
@@ -0,0 +1,99 @@
+package client
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"net"
+	"net/url"
+)
+
+const (
+	// UDPPayloadSize is a reasonable default payload size for UDP packets that
+	// could be travelling over the internet.
+	UDPPayloadSize = 512
+)
+
+// UDPConfig is the config data needed to create a UDP Client
+type UDPConfig struct {
+	// URL should be of the form "udp://host:port"
+	// or "udp://[ipv6-host%zone]:port".
+	URL string
+
+	// PayloadSize is the maximum size of a UDP client message, optional
+	// Tune this based on your network. Defaults to UDPPayloadSize.
+	PayloadSize int
+}
+
+func NewUDP(config UDPConfig) (Client, error) {
+	p, err := url.Parse(config.URL)
+	if err != nil {
+		return nil, fmt.Errorf("Error parsing UDP url [%s]: %s", config.URL, err)
+	}
+
+	udpAddr, err := net.ResolveUDPAddr("udp", p.Host)
+	if err != nil {
+		return nil, fmt.Errorf("Error resolving UDP Address [%s]: %s", p.Host, err)
+	}
+
+	conn, err := net.DialUDP("udp", nil, udpAddr)
+	if err != nil {
+		return nil, fmt.Errorf("Error dialing UDP address [%s]: %s",
+			udpAddr.String(), err)
+	}
+
+	size := config.PayloadSize
+	if size == 0 {
+		size = UDPPayloadSize
+	}
+	buf := make([]byte, size)
+	return &udpClient{conn: conn, buffer: buf}, nil
+}
+
+type udpClient struct {
+	conn   *net.UDPConn
+	buffer []byte
+}
+
+func (c *udpClient) Query(command string) error {
+	return nil
+}
+
+func (c *udpClient) Write(b []byte) (int, error) {
+	return c.WriteStream(bytes.NewReader(b), -1)
+}
+
+// write params are ignored by the UDP client
+func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
+	return c.WriteStream(bytes.NewReader(b), -1)
+}
+
+// contentLength is ignored by the UDP client.
+func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
+	var totaln int
+	for {
+		nR, err := r.Read(c.buffer)
+		if nR == 0 {
+			break
+		}
+		if err != io.EOF && err != nil {
+			return totaln, err
+		}
+		nW, err := c.conn.Write(c.buffer[0:nR])
+		totaln += nW
+		if err != nil {
+			return totaln, err
+		}
+	}
+	return totaln, nil
+}
+
+// contentLength is ignored by the UDP client.
+// write params are ignored by the UDP client
+func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
+	return c.WriteStream(r, -1)
+}
+
+func (c *udpClient) Close() error {
+	return c.conn.Close()
+}
diff --git a/plugins/outputs/influxdb/client/udp_test.go b/plugins/outputs/influxdb/client/udp_test.go
new file mode 100644
index 00000000..31196ddc
--- /dev/null
+++ b/plugins/outputs/influxdb/client/udp_test.go
@@ -0,0 +1,163 @@
+package client
+
+import (
+	"bytes"
+	"net"
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/metric"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestUDPClient(t *testing.T) {
+	config := UDPConfig{
+		URL: "udp://localhost:8089",
+	}
+	client, err := NewUDP(config)
+	assert.NoError(t, err)
+
+	err = client.Query("ANY QUERY RETURNS NIL")
+	assert.NoError(t, err)
+
+	assert.NoError(t, client.Close())
+}
+
+func TestNewUDPClient_Errors(t *testing.T) {
+	// url.Parse Error
+	config := UDPConfig{
+		URL: "udp://localhost%35:8089",
+	}
+	_, err := NewUDP(config)
+	assert.Error(t, err)
+
+	// ResolveUDPAddr Error
+	config = UDPConfig{
+		URL: "udp://localhost:999999",
+	}
+	_, err = NewUDP(config)
+	assert.Error(t, err)
+}
+
+func TestUDPClient_Write(t *testing.T) {
+	config := UDPConfig{
+		URL: "udp://localhost:8199",
+	}
+	client, err := NewUDP(config)
+	assert.NoError(t, err)
+
+	packets := make(chan string, 100)
+	address, err := net.ResolveUDPAddr("udp", "localhost:8199")
+	assert.NoError(t, err)
+	listener, err := net.ListenUDP("udp", address)
+	defer listener.Close()
+	assert.NoError(t, err)
+	go func() {
+		buf := make([]byte, 200)
+		for {
+			n, _, err := listener.ReadFromUDP(buf)
+			if err != nil {
+				packets <- err.Error()
+			}
+			packets <- string(buf[0:n])
+		}
+	}()
+
+	// test sending simple metric
+	time.Sleep(time.Second)
+	n, err := client.Write([]byte("cpu value=99\n"))
+	assert.Equal(t, n, 13)
+	assert.NoError(t, err)
+	pkt := <-packets
+	assert.Equal(t, "cpu value=99\n", pkt)
+
+	metrics := `cpu value=99
+cpu value=55
+cpu value=44
+cpu value=101
+cpu value=91
+cpu value=92
+`
+	// test sending packet with 6 metrics in a stream.
+	reader := bytes.NewReader([]byte(metrics))
+	// contentLength is ignored:
+	n, err = client.WriteStream(reader, 10)
+	assert.Equal(t, n, len(metrics))
+	assert.NoError(t, err)
+	pkt = <-packets
+	assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
+
+	//
+	// Test that UDP packets get broken up properly:
+	config2 := UDPConfig{
+		URL:         "udp://localhost:8199",
+		PayloadSize: 25,
+	}
+	client2, err := NewUDP(config2)
+	assert.NoError(t, err)
+
+	wp := WriteParams{}
+
+	//
+	// Using Write():
+	buf := []byte(metrics)
+	n, err = client2.WriteWithParams(buf, wp)
+	assert.Equal(t, n, len(metrics))
+	assert.NoError(t, err)
+	pkt = <-packets
+	assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
+	pkt = <-packets
+	assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
+	pkt = <-packets
+	assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
+	pkt = <-packets
+	assert.Equal(t, "=92\n", pkt)
+
+	//
+	// Using WriteStream():
+	reader = bytes.NewReader([]byte(metrics))
+	n, err = client2.WriteStreamWithParams(reader, 10, wp)
+	assert.Equal(t, n, len(metrics))
+	assert.NoError(t, err)
+	pkt = <-packets
+	assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
+	pkt = <-packets
+	assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
+	pkt = <-packets
+	assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
+	pkt = <-packets
+	assert.Equal(t, "=92\n", pkt)
+
+	//
+	// Using WriteStream() & a metric.Reader:
+	config3 := UDPConfig{
+		URL:         "udp://localhost:8199",
+		PayloadSize: 40,
+	}
+	client3, err := NewUDP(config3)
+	assert.NoError(t, err)
+
+	now := time.Unix(1484142942, 0)
+	m1, _ := metric.New("test", map[string]string{},
+		map[string]interface{}{"value": 1.1}, now)
+	m2, _ := metric.New("test", map[string]string{},
+		map[string]interface{}{"value": 1.1}, now)
+	m3, _ := metric.New("test", map[string]string{},
+		map[string]interface{}{"value": 1.1}, now)
+	ms := []telegraf.Metric{m1, m2, m3}
+	mReader := metric.NewReader(ms)
+	n, err = client3.WriteStreamWithParams(mReader, 10, wp)
+	// 3 metrics at 35 bytes each (including the newline)
+	assert.Equal(t, 105, n)
+	assert.NoError(t, err)
+	pkt = <-packets
+	assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
+	pkt = <-packets
+	assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
+	pkt = <-packets
+	assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
+
+	assert.NoError(t, client.Close())
+}
diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go
index 999e1bc6..06d8bd04 100644
--- a/plugins/outputs/influxdb/influxdb.go
+++ b/plugins/outputs/influxdb/influxdb.go
@@ -1,19 +1,18 @@
 package influxdb
 
 import (
-	"errors"
 	"fmt"
 	"log"
 	"math/rand"
-	"net/url"
 	"strings"
 	"time"
 
 	"github.com/influxdata/telegraf"
 	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/metric"
 	"github.com/influxdata/telegraf/plugins/outputs"
 
-	"github.com/influxdata/influxdb/client/v2"
+	"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
 )
 
 type InfluxDB struct {
@@ -41,7 +40,7 @@ type InfluxDB struct {
 	// Precision is only here for legacy support. It will be ignored.
 	Precision string
 
-	conns []client.Client
+	clients []client.Client
 }
 
 var sampleConfig = `
@@ -88,79 +87,56 @@ func (i *InfluxDB) Connect() error {
 		urls = append(urls, i.URL)
 	}
 
-	tlsCfg, err := internal.GetTLSConfig(
+	tlsConfig, err := internal.GetTLSConfig(
 		i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify)
 	if err != nil {
 		return err
 	}
 
-	var conns []client.Client
 	for _, u := range urls {
 		switch {
 		case strings.HasPrefix(u, "udp"):
-			parsed_url, err := url.Parse(u)
-			if err != nil {
-				return err
-			}
-
-			if i.UDPPayload == 0 {
-				i.UDPPayload = client.UDPPayloadSize
-			}
-			c, err := client.NewUDPClient(client.UDPConfig{
-				Addr:        parsed_url.Host,
+			config := client.UDPConfig{
+				URL:         u,
 				PayloadSize: i.UDPPayload,
-			})
+			c, err := client.NewUDP(config)
+			}
 			if err != nil {
-				return err
+				return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
 			}
-			conns = append(conns, c)
+			i.clients = append(i.clients, c)
 		default:
 			// If URL doesn't start with "udp", assume HTTP client
-			c, err := client.NewHTTPClient(client.HTTPConfig{
-				Addr:      u,
-				Username:  i.Username,
-				Password:  i.Password,
-				UserAgent: i.UserAgent,
+			config := client.HTTPConfig{
+				URL:       u,
 				Timeout:   i.Timeout.Duration,
-				TLSConfig: tlsCfg,
-			})
+				TLSConfig: tlsConfig,
+				UserAgent: i.UserAgent,
+			}
+			wp := client.WriteParams{
+				Database:        i.Database,
+				RetentionPolicy: i.RetentionPolicy,
+				Consistency:     i.WriteConsistency,
+			}
+			c, err := client.NewHTTP(config, wp)
 			if err != nil {
-				return err
+				return fmt.Errorf("Error creating HTTP Client [%s]: %s", u, err)
 			}
+			i.clients = append(i.clients, c)
 
-			err = createDatabase(c, i.Database)
+			err = c.Query("CREATE DATABASE " + i.Database)
 			if err != nil {
 				log.Println("E! Database creation failed: " + err.Error())
 				continue
 			}
-
-			conns = append(conns, c)
 		}
 	}
 
-	i.conns = conns
 	rand.Seed(time.Now().UnixNano())
 	return nil
 }
 
-func createDatabase(c client.Client, database string) error {
-	// Create Database if it doesn't exist
-	_, err := c.Query(client.Query{
-		Command: fmt.Sprintf("CREATE DATABASE \"%s\"", database),
-	})
-	return err
-}
-
 func (i *InfluxDB) Close() error {
-	var errS string
-	for j, _ := range i.conns {
-		if err := i.conns[j].Close(); err != nil {
-			errS += err.Error()
-		}
-	}
-	if errS != "" {
-		return fmt.Errorf("output influxdb close failed: %s", errS)
-	}
 	return nil
 }
 
@@ -175,34 +151,24 @@ func (i *InfluxDB) Description() string {
 // Choose a random server in the cluster to write to until a successful write
 // occurs, logging each unsuccessful. If all servers fail, return error.
 func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
-	if len(i.conns) == 0 {
-		err := i.Connect()
-		if err != nil {
-			return err
-		}
-	}
-	bp, err := client.NewBatchPoints(client.BatchPointsConfig{
-		Database:         i.Database,
-		RetentionPolicy:  i.RetentionPolicy,
-		WriteConsistency: i.WriteConsistency,
-	})
-	if err != nil {
-		return err
-	}
-
-	for _, metric := range metrics {
-		bp.AddPoint(metric.Point())
+	bufsize := 0
+	for _, m := range metrics {
+		bufsize += m.Len()
+	r := metric.NewReader(metrics)
 	}
 
 	// This will get set to nil if a successful write occurs
-	err = errors.New("Could not write to any InfluxDB server in cluster")
+	err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
 
-	p := rand.Perm(len(i.conns))
+	p := rand.Perm(len(i.clients))
 	for _, n := range p {
-		if e := i.conns[n].Write(bp); e != nil {
-			// If the database was not found, try to recreate it
+		if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
+			// Log write failure:
+			log.Printf("E! InfluxDB Output Error: %s", e)
+
+			// If the database was not found, try to recreate it:
 			if strings.Contains(e.Error(), "database not found") {
-				if errc := createDatabase(i.conns[n], i.Database); errc != nil {
+				if errc := i.clients[n].Query("CREATE DATABASE  " + i.Database); errc != nil {
 					log.Printf("E! Error: Database %s not found and failed to recreate\n",
 						i.Database)
 				}
@@ -225,10 +191,12 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
 	return err
 }
 
+func newInflux() *InfluxDB {
+	return &InfluxDB{
+		Timeout: internal.Duration{Duration: time.Second * 5},
+	}
+}
+
 func init() {
-	outputs.Add("influxdb", func() telegraf.Output {
-		return &InfluxDB{
-			Timeout: internal.Duration{Duration: time.Second * 5},
-		}
-	})
+	outputs.Add("influxdb", func() telegraf.Output { return newInflux() })
 }
diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go
index 1414fa83..db2cd5ec 100644
--- a/plugins/outputs/influxdb/influxdb_test.go
+++ b/plugins/outputs/influxdb/influxdb_test.go
@@ -20,22 +20,123 @@ func TestUDPInflux(t *testing.T) {
 	require.NoError(t, err)
 	err = i.Write(testutil.MockMetrics())
 	require.NoError(t, err)
+	require.NoError(t, i.Close())
 }
 
 func TestHTTPInflux(t *testing.T) {
 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		w.WriteHeader(http.StatusOK)
-		w.Header().Set("Content-Type", "application/json")
-		fmt.Fprintln(w, `{"results":[{}]}`)
+		switch r.URL.Path {
+		case "/write":
+			// test that database is set properly
+			if r.FormValue("db") != "test" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+			}
+			// test that user agent is set properly
+			if r.UserAgent() != "telegraf" {
+				w.WriteHeader(http.StatusTeapot)
+				w.Header().Set("Content-Type", "application/json")
+			}
+			w.WriteHeader(http.StatusNoContent)
+			w.Header().Set("Content-Type", "application/json")
+		case "/query":
+			w.WriteHeader(http.StatusOK)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}]}`)
+		}
 	}))
 	defer ts.Close()
 
+	i := newInflux()
+	i.URLs = []string{ts.URL}
+	i.Database = "test"
+	i.UserAgent = "telegraf"
+
+	err := i.Connect()
+	require.NoError(t, err)
+	err = i.Write(testutil.MockMetrics())
+	require.NoError(t, err)
+	require.NoError(t, i.Close())
+}
+
+func TestUDPConnectError(t *testing.T) {
 	i := InfluxDB{
-		URLs: []string{ts.URL},
+		URLs: []string{"udp://foobar:8089"},
 	}
 
+	err := i.Connect()
+	require.Error(t, err)
+
+	i = InfluxDB{
+		URLs: []string{"udp://localhost:9999999"},
+	}
+
+	err = i.Connect()
+	require.Error(t, err)
+}
+
+func TestHTTPConnectError_InvalidURL(t *testing.T) {
+	i := InfluxDB{
+		URLs: []string{"http://foobar:8089"},
+	}
+
+	err := i.Connect()
+	require.Error(t, err)
+
+	i = InfluxDB{
+		URLs: []string{"http://localhost:9999999"},
+	}
+
+	err = i.Connect()
+	require.Error(t, err)
+}
+
+func TestHTTPConnectError_DatabaseCreateFail(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/query":
+			w.WriteHeader(http.StatusNotFound)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}],"error":"test error"}`)
+		}
+	}))
+	defer ts.Close()
+
+	i := InfluxDB{
+		URLs:     []string{ts.URL},
+		Database: "test",
+	}
+
+	// database creation errors do not return an error from Connect
+	// they are only logged.
 	err := i.Connect()
 	require.NoError(t, err)
-	err = i.Write(testutil.MockMetrics())
+	require.NoError(t, i.Close())
+}
+
+func TestHTTPError_DatabaseNotFound(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		switch r.URL.Path {
+		case "/write":
+			w.WriteHeader(http.StatusNotFound)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
+		case "/query":
+			w.WriteHeader(http.StatusNotFound)
+			w.Header().Set("Content-Type", "application/json")
+			fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
+		}
+	}))
+	defer ts.Close()
+
+	i := InfluxDB{
+		URLs:     []string{ts.URL},
+		Database: "test",
+	}
+
+	err := i.Connect()
 	require.NoError(t, err)
+	err = i.Write(testutil.MockMetrics())
+	require.Error(t, err)
+	require.NoError(t, i.Close())
 }
-- 
GitLab