From 4de75ce62175cf9af6c9da6d837676cf9f57d7b0 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Mon, 25 Apr 2016 17:49:06 -0600
Subject: [PATCH] Performance refactor of running_output buffers

closes #914
closes #967
---
 CHANGELOG.md                           |  16 ++
 etc/telegraf.conf                      |  85 ++++++-
 internal/buffer/buffer.go              |  77 ++++++
 internal/buffer/buffer_test.go         |  94 ++++++++
 internal/config/config.go              |  20 +-
 internal/models/running_output.go      | 157 ++++++------
 internal/models/running_output_test.go | 316 ++++++++++++++++++-------
 7 files changed, 573 insertions(+), 192 deletions(-)
 create mode 100644 internal/buffer/buffer.go
 create mode 100644 internal/buffer/buffer_test.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d1f00ad0..306083da 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,9 +2,18 @@
 
 ### Release Notes
 
+- New [agent] configuration option: `metric_batch_size`. This option tells
+telegraf the maximum batch size to allow to accumulate before sending a flush
+to the configured outputs. `metric_buffer_limit` now refers to the absolute
+maximum number of metrics that will accumulate before metrics are dropped.
+
+- There is no longer an option to
+`flush_buffer_when_full`, this is now the default and only behavior of telegraf.
+
 - **Breaking Change**: docker plugin tags. The cont_id tag no longer exists, it
 will now be a field, and be called container_id. Additionally, cont_image and
 cont_name are being renamed to container_image and container_name.
+
 - **Breaking Change**: docker plugin measurements. The `docker_cpu`, `docker_mem`,
 `docker_blkio` and `docker_net` measurements are being renamed to
 `docker_container_cpu`, `docker_container_mem`, `docker_container_blkio` and
@@ -16,15 +25,19 @@ So adding "container" to each metric will:
 (1) make it more clear that these metrics are per-container, and
 (2) allow users to easily drop per-container metrics if cardinality is an
 issue (`namedrop = ["docker_container_*"]`)
+
 - `tagexclude` and `taginclude` are now available, which can be used to remove
 tags from measurements on inputs and outputs. See
 [the configuration doc](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md)
 for more details.
+
 - **Measurement filtering:** All measurement filters now match based on glob
 only. Previously there was an undocumented behavior where filters would match
 based on _prefix_ in addition to globs. This means that a filter like
 `fielddrop = ["time_"]` will need to be changed to `fielddrop = ["time_*"]`
+
 - **datadog**: measurement and field names will no longer have `_` replaced by `.`
+
 - The following plugins have changed their tags to _not_ overwrite the host tag:
   - cassandra: `host -> cassandra_host`
   - disque: `host -> disque_host`
@@ -42,6 +55,8 @@ based on _prefix_ in addition to globs. This means that a filter like
 - [#1072](https://github.com/influxdata/telegraf/pull/1072): New Input Plugin: filestat.
 - [#1066](https://github.com/influxdata/telegraf/pull/1066): Replication lag metrics for MongoDB input plugin
 - [#1086](https://github.com/influxdata/telegraf/pull/1086): Ability to specify AWS keys in config file. Thanks @johnrengleman!
+- [#1096](https://github.com/influxdata/telegraf/pull/1096): Performance refactor of running output buffers.
+- [#967](https://github.com/influxdata/telegraf/issues/967): Buffer logging improvements.
 
 ### Bugfixes
 
@@ -55,6 +70,7 @@ based on _prefix_ in addition to globs. This means that a filter like
 - [#1078](https://github.com/influxdata/telegraf/issues/1078): Use default AWS credential chain.
 - [#1070](https://github.com/influxdata/telegraf/issues/1070): SQL Server input. Fix datatype conversion.
 - [#1089](https://github.com/influxdata/telegraf/issues/1089): Fix leaky TCP connections in phpfpm plugin.
+- [#914](https://github.com/influxdata/telegraf/issues/914): Telegraf can drop metrics on full buffers.
 
 ## v0.12.1 [2016-04-14]
 
diff --git a/etc/telegraf.conf b/etc/telegraf.conf
index 46b422ff..c855b94f 100644
--- a/etc/telegraf.conf
+++ b/etc/telegraf.conf
@@ -30,15 +30,13 @@
   ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
   round_interval = true
 
-  ## Telegraf will send metrics to output in batch of at
+  ## Telegraf will send metrics to outputs in batches of at
   ## most metric_batch_size metrics.
   metric_batch_size = 1000
-  ## Telegraf will cache metric_buffer_limit metrics for each output, and will
-  ## flush this buffer on a successful write. This should be a multiple of
-  ## metric_batch_size and could not be less than 2 times metric_batch_size
+  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
+  ## output, and will flush this buffer on a successful write. Oldest metrics
+  ## are dropped first when this buffer fills.
   metric_buffer_limit = 10000
-  ## Flush the buffer whenever full, regardless of flush_interval.
-  flush_buffer_when_full = true
 
   ## Collection jitter is used to jitter the collection by a random amount.
   ## Each plugin will sleep for a random time within jitter before collecting.
@@ -151,6 +149,15 @@
 #   ## Amazon REGION
 #   region = 'us-east-1'
 # 
+#   ## Amazon Credentials
+#   ## Credentials are loaded in the following order
+#   ## 1) explicit credentials from 'access_key' and 'secret_key'
+#   ## 2) environment variables
+#   ## 3) shared credentials file
+#   ## 4) EC2 Instance Profile
+#   #access_key = ""
+#   #secret_key = ""
+# 
 #   ## Namespace for the CloudWatch MetricDatums
 #   namespace = 'InfluxData/Telegraf'
 
@@ -243,6 +250,16 @@
 # [[outputs.kinesis]]
 #   ## Amazon REGION of kinesis endpoint.
 #   region = "ap-southeast-2"
+# 
+#   ## Amazon Credentials
+#   ## Credentials are loaded in the following order
+#   ## 1) explicit credentials from 'access_key' and 'secret_key'
+#   ## 2) environment variables
+#   ## 3) shared credentials file
+#   ## 4) EC2 Instance Profile
+#   #access_key = ""
+#   #secret_key = ""
+# 
 #   ## Kinesis StreamName must exist prior to starting telegraf.
 #   streamname = "StreamName"
 #   ## PartitionKey as used for sharding data.
@@ -457,6 +474,15 @@
 #   ## Amazon Region
 #   region = 'us-east-1'
 # 
+#   ## Amazon Credentials
+#   ## Credentials are loaded in the following order
+#   ## 1) explicit credentials from 'access_key' and 'secret_key'
+#   ## 2) environment variables
+#   ## 3) shared credentials file
+#   ## 4) EC2 Instance Profile
+#   #access_key = ""
+#   #secret_key = ""
+# 
 #   ## Requested CloudWatch aggregation Period (required - must be a multiple of 60s)
 #   period = '1m'
 # 
@@ -588,8 +614,14 @@
 # [[inputs.filestat]]
 #   ## Files to gather stats about.
 #   ## These accept standard unix glob matching rules, but with the addition of
-#   ## ** as a "super asterisk". See https://github.com/gobwas/glob.
-#   files = ["/etc/telegraf/telegraf.conf", "/var/log/**.log"]
+#   ## ** as a "super asterisk". ie:
+#   ##   "/var/log/**.log"  -> recursively find all .log files in /var/log
+#   ##   "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
+#   ##   "/var/log/apache.log" -> just tail the apache log file
+#   ##
+#   ## See https://github.com/gobwas/glob for more examples
+#   ##
+#   files = ["/var/log/**.log"]
 #   ## If true, read the entire file and calculate an md5 checksum.
 #   md5 = false
 
@@ -980,6 +1012,11 @@
 #   ## databases are gathered.
 #   ## databases = ["app_production", "testing"]
 #   #
+#   # outputaddress = "db01"
+#   ## A custom name for the database that will be used as the "server" tag in the
+#   ## measurement output. If not specified, a default one generated from
+#   ## the connection address is used.
+#   #
 #   ## Define the toml config where the sql queries are stored
 #   ## New queries can be added, if the withdbname is set to true and there is no
 #   ## databases defined in the 'databases field', the sql query is ended by a
@@ -990,24 +1027,28 @@
 #   ## because the databases variable was set to ['postgres', 'pgbench' ] and the
 #   ## withdbname was true. Be careful that if the withdbname is set to false you
 #   ## don't have to define the where clause (aka with the dbname) the tagvalue
-#   ## field is used to define custom tags (separated by comas)
+#   ## field is used to define custom tags (separated by commas)
+#   ## The optional "measurement" value can be used to override the default
+#   ## output measurement name ("postgresql").
 #   #
 #   ## Structure :
 #   ## [[inputs.postgresql_extensible.query]]
 #   ##   sqlquery string
 #   ##   version string
 #   ##   withdbname boolean
-#   ##   tagvalue string (coma separated)
+#   ##   tagvalue string (comma separated)
+#   ##   measurement string
 #   [[inputs.postgresql_extensible.query]]
 #     sqlquery="SELECT * FROM pg_stat_database"
 #     version=901
 #     withdbname=false
 #     tagvalue=""
+#     measurement=""
 #   [[inputs.postgresql_extensible.query]]
 #     sqlquery="SELECT * FROM pg_stat_bgwriter"
 #     version=901
 #     withdbname=false
-#     tagvalue=""
+#     tagvalue="postgresql.stats"
 
 
 # # Read metrics from one or many PowerDNS servers
@@ -1379,6 +1420,28 @@
 #   percentile_limit = 1000
 
 
+# # Stream a log file, like the tail -f command
+# [[inputs.tail]]
+#   ## files to tail.
+#   ## These accept standard unix glob matching rules, but with the addition of
+#   ## ** as a "super asterisk". ie:
+#   ##   "/var/log/**.log"  -> recursively find all .log files in /var/log
+#   ##   "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
+#   ##   "/var/log/apache.log" -> just tail the apache log file
+#   ##
+#   ## See https://github.com/gobwas/glob for more examples
+#   ##
+#   files = ["/var/mymetrics.out"]
+#   ## Read file from beginning.
+#   from_beginning = false
+# 
+#   ## Data format to consume.
+#   ## Each data format has it's own unique set of configuration options, read
+#   ## more about them here:
+#   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+#   data_format = "influx"
+
+
 # # Generic TCP listener
 # [[inputs.tcp_listener]]
 #   ## Address and port to host TCP listener on
diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go
new file mode 100644
index 00000000..b7a05bf0
--- /dev/null
+++ b/internal/buffer/buffer.go
@@ -0,0 +1,77 @@
+package buffer
+
+import (
+	"github.com/influxdata/telegraf"
+)
+
+// Buffer is an object for storing metrics in a circular buffer.
+type Buffer struct {
+	buf chan telegraf.Metric
+	// total dropped metrics
+	drops int
+	// total metrics added
+	total int
+}
+
+// NewBuffer returns a Buffer
+//   size is the maximum number of metrics that Buffer will cache. If Add is
+//   called when the buffer is full, then the oldest metric(s) will be dropped.
+func NewBuffer(size int) *Buffer {
+	return &Buffer{
+		buf: make(chan telegraf.Metric, size),
+	}
+}
+
+// IsEmpty returns true if Buffer is empty.
+func (b *Buffer) IsEmpty() bool {
+	return len(b.buf) == 0
+}
+
+// Len returns the current length of the buffer.
+func (b *Buffer) Len() int {
+	return len(b.buf)
+}
+
+// Drops returns the total number of dropped metrics that have occured in this
+// buffer since instantiation.
+func (b *Buffer) Drops() int {
+	return b.drops
+}
+
+// Total returns the total number of metrics that have been added to this buffer.
+func (b *Buffer) Total() int {
+	return b.total
+}
+
+// Add adds metrics to the buffer.
+func (b *Buffer) Add(metrics ...telegraf.Metric) {
+	for i, _ := range metrics {
+		b.total++
+		select {
+		case b.buf <- metrics[i]:
+		default:
+			b.drops++
+			<-b.buf
+			b.buf <- metrics[i]
+		}
+	}
+}
+
+// Batch returns a batch of metrics of size batchSize.
+// the batch will be of maximum length batchSize. It can be less than batchSize,
+// if the length of Buffer is less than batchSize.
+func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
+	n := min(len(b.buf), batchSize)
+	out := make([]telegraf.Metric, n)
+	for i := 0; i < n; i++ {
+		out[i] = <-b.buf
+	}
+	return out
+}
+
+func min(a, b int) int {
+	if b < a {
+		return b
+	}
+	return a
+}
diff --git a/internal/buffer/buffer_test.go b/internal/buffer/buffer_test.go
new file mode 100644
index 00000000..9a36f4d8
--- /dev/null
+++ b/internal/buffer/buffer_test.go
@@ -0,0 +1,94 @@
+package buffer
+
+import (
+	"testing"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/testutil"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var metricList = []telegraf.Metric{
+	testutil.TestMetric(2, "mymetric1"),
+	testutil.TestMetric(1, "mymetric2"),
+	testutil.TestMetric(11, "mymetric3"),
+	testutil.TestMetric(15, "mymetric4"),
+	testutil.TestMetric(8, "mymetric5"),
+}
+
+func BenchmarkAddMetrics(b *testing.B) {
+	buf := NewBuffer(10000)
+	m := testutil.TestMetric(1, "mymetric")
+	for n := 0; n < b.N; n++ {
+		buf.Add(m)
+	}
+}
+
+func TestNewBufferBasicFuncs(t *testing.T) {
+	b := NewBuffer(10)
+
+	assert.True(t, b.IsEmpty())
+	assert.Zero(t, b.Len())
+	assert.Zero(t, b.Drops())
+	assert.Zero(t, b.Total())
+
+	m := testutil.TestMetric(1, "mymetric")
+	b.Add(m)
+	assert.False(t, b.IsEmpty())
+	assert.Equal(t, b.Len(), 1)
+	assert.Equal(t, b.Drops(), 0)
+	assert.Equal(t, b.Total(), 1)
+
+	b.Add(metricList...)
+	assert.False(t, b.IsEmpty())
+	assert.Equal(t, b.Len(), 6)
+	assert.Equal(t, b.Drops(), 0)
+	assert.Equal(t, b.Total(), 6)
+}
+
+func TestDroppingMetrics(t *testing.T) {
+	b := NewBuffer(10)
+
+	// Add up to the size of the buffer
+	b.Add(metricList...)
+	b.Add(metricList...)
+	assert.False(t, b.IsEmpty())
+	assert.Equal(t, b.Len(), 10)
+	assert.Equal(t, b.Drops(), 0)
+	assert.Equal(t, b.Total(), 10)
+
+	// Add 5 more and verify they were dropped
+	b.Add(metricList...)
+	assert.False(t, b.IsEmpty())
+	assert.Equal(t, b.Len(), 10)
+	assert.Equal(t, b.Drops(), 5)
+	assert.Equal(t, b.Total(), 15)
+}
+
+func TestGettingBatches(t *testing.T) {
+	b := NewBuffer(20)
+
+	// Verify that the buffer returned is smaller than requested when there are
+	// not as many items as requested.
+	b.Add(metricList...)
+	batch := b.Batch(10)
+	assert.Len(t, batch, 5)
+
+	// Verify that the buffer is now empty
+	assert.True(t, b.IsEmpty())
+	assert.Zero(t, b.Len())
+	assert.Zero(t, b.Drops())
+	assert.Equal(t, b.Total(), 5)
+
+	// Verify that the buffer returned is not more than the size requested
+	b.Add(metricList...)
+	batch = b.Batch(3)
+	assert.Len(t, batch, 3)
+
+	// Verify that buffer is not empty
+	assert.False(t, b.IsEmpty())
+	assert.Equal(t, b.Len(), 2)
+	assert.Equal(t, b.Drops(), 0)
+	assert.Equal(t, b.Total(), 10)
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index fcebd24e..2a34493f 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -188,15 +188,13 @@ var header = `# Telegraf Configuration
   ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
   round_interval = true
 
-  ## Telegraf will send metrics to output in batch of at
+  ## Telegraf will send metrics to outputs in batches of at
   ## most metric_batch_size metrics.
   metric_batch_size = 1000
-  ## Telegraf will cache metric_buffer_limit metrics for each output, and will
-  ## flush this buffer on a successful write. This should be a multiple of
-  ## metric_batch_size and could not be less than 2 times metric_batch_size
+  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
+  ## output, and will flush this buffer on a successful write. Oldest metrics
+  ## are dropped first when this buffer fills.
   metric_buffer_limit = 10000
-  ## Flush the buffer whenever full, regardless of flush_interval.
-  flush_buffer_when_full = true
 
   ## Collection jitter is used to jitter the collection by a random amount.
   ## Each plugin will sleep for a random time within jitter before collecting.
@@ -535,14 +533,8 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
 		return err
 	}
 
-	ro := internal_models.NewRunningOutput(name, output, outputConfig)
-	if c.Agent.MetricBatchSize > 0 {
-		ro.MetricBatchSize = c.Agent.MetricBatchSize
-	}
-	if c.Agent.MetricBufferLimit > 0 {
-		ro.MetricBufferLimit = c.Agent.MetricBufferLimit
-	}
-	ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
+	ro := internal_models.NewRunningOutput(name, output, outputConfig,
+		c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit)
 	c.Outputs = append(c.Outputs, ro)
 	return nil
 }
diff --git a/internal/models/running_output.go b/internal/models/running_output.go
index cab5035f..d0d2abbc 100644
--- a/internal/models/running_output.go
+++ b/internal/models/running_output.go
@@ -2,14 +2,13 @@ package internal_models
 
 import (
 	"log"
-	"sync"
 	"time"
 
 	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal/buffer"
 )
 
 const (
-
 	// Default size of metrics batch size.
 	DEFAULT_METRIC_BATCH_SIZE = 1000
 
@@ -17,40 +16,40 @@ const (
 	DEFAULT_METRIC_BUFFER_LIMIT = 10000
 )
 
-// tmpmetrics point to batch of metrics ready to be wrote to output.
-// readI point to the oldest batch of metrics (the first to sent to output). It
-// may point to nil value if tmpmetrics is empty.
-// writeI point to the next slot to buffer a batch of metrics is output fail to
-// write.
+// RunningOutput contains the output configuration
 type RunningOutput struct {
-	Name                string
-	Output              telegraf.Output
-	Config              *OutputConfig
-	Quiet               bool
-	MetricBufferLimit   int
-	MetricBatchSize     int
-	FlushBufferWhenFull bool
-
-	metrics    []telegraf.Metric
-	tmpmetrics []([]telegraf.Metric)
-	writeI     int
-	readI      int
-
-	sync.Mutex
+	Name              string
+	Output            telegraf.Output
+	Config            *OutputConfig
+	Quiet             bool
+	MetricBufferLimit int
+	MetricBatchSize   int
+
+	metrics     *buffer.Buffer
+	failMetrics *buffer.Buffer
 }
 
 func NewRunningOutput(
 	name string,
 	output telegraf.Output,
 	conf *OutputConfig,
+	batchSize int,
+	bufferLimit int,
 ) *RunningOutput {
+	if bufferLimit == 0 {
+		bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT
+	}
+	if batchSize == 0 {
+		batchSize = DEFAULT_METRIC_BATCH_SIZE
+	}
 	ro := &RunningOutput{
 		Name:              name,
-		metrics:           make([]telegraf.Metric, 0),
+		metrics:           buffer.NewBuffer(batchSize),
+		failMetrics:       buffer.NewBuffer(bufferLimit),
 		Output:            output,
 		Config:            conf,
-		MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
-		MetricBatchSize:   DEFAULT_METRIC_BATCH_SIZE,
+		MetricBufferLimit: bufferLimit,
+		MetricBatchSize:   batchSize,
 	}
 	return ro
 }
@@ -63,19 +62,6 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
 			return
 		}
 	}
-	ro.Lock()
-	defer ro.Unlock()
-
-	if ro.tmpmetrics == nil {
-		size := ro.MetricBufferLimit / ro.MetricBatchSize
-		// ro.metrics already contains one batch
-		size = size - 1
-
-		if size < 1 {
-			size = 1
-		}
-		ro.tmpmetrics = make([]([]telegraf.Metric), size)
-	}
 
 	// Filter any tagexclude/taginclude parameters before adding metric
 	if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 {
@@ -90,69 +76,64 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
 		metric, _ = telegraf.NewMetric(name, tags, fields, t)
 	}
 
-	if len(ro.metrics) < ro.MetricBatchSize {
-		ro.metrics = append(ro.metrics, metric)
-	} else {
-		flushSuccess := true
-		if ro.FlushBufferWhenFull {
-			err := ro.write(ro.metrics)
-			if err != nil {
-				log.Printf("ERROR writing full metric buffer to output %s, %s",
-					ro.Name, err)
-				flushSuccess = false
-			}
-		} else {
-			flushSuccess = false
-		}
-		if !flushSuccess {
-			if ro.tmpmetrics[ro.writeI] != nil && ro.writeI == ro.readI {
-				log.Printf("WARNING: overwriting cached metrics, you may want to " +
-					"increase the metric_buffer_limit setting in your [agent] " +
-					"config if you do not wish to overwrite metrics.\n")
-				ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
-			}
-			ro.tmpmetrics[ro.writeI] = ro.metrics
-			ro.writeI = (ro.writeI + 1) % cap(ro.tmpmetrics)
+	ro.metrics.Add(metric)
+	if ro.metrics.Len() == ro.MetricBatchSize {
+		batch := ro.metrics.Batch(ro.MetricBatchSize)
+		err := ro.write(batch)
+		if err != nil {
+			ro.failMetrics.Add(batch...)
 		}
-		ro.metrics = make([]telegraf.Metric, 0)
-		ro.metrics = append(ro.metrics, metric)
 	}
 }
 
 // Write writes all cached points to this output.
 func (ro *RunningOutput) Write() error {
-	ro.Lock()
-	defer ro.Unlock()
-
-	if ro.tmpmetrics == nil {
-		size := ro.MetricBufferLimit / ro.MetricBatchSize
-		// ro.metrics already contains one batch
-		size = size - 1
-
-		if size < 1 {
-			size = 1
-		}
-		ro.tmpmetrics = make([]([]telegraf.Metric), size)
+	if !ro.Quiet {
+		log.Printf("Output [%s] buffer fullness: %d / %d metrics. "+
+			"Total gathered metrics: %d. Total dropped metrics: %d.",
+			ro.Name,
+			ro.failMetrics.Len()+ro.metrics.Len(),
+			ro.MetricBufferLimit,
+			ro.metrics.Total(),
+			ro.metrics.Drops()+ro.failMetrics.Drops())
 	}
 
-	// Write any cached metric buffers before, as those metrics are the
-	// oldest
-	for ro.tmpmetrics[ro.readI] != nil {
-		if err := ro.write(ro.tmpmetrics[ro.readI]); err != nil {
-			return err
-		} else {
-			ro.tmpmetrics[ro.readI] = nil
-			ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
+	var err error
+	if !ro.failMetrics.IsEmpty() {
+		bufLen := ro.failMetrics.Len()
+		// how many batches of failed writes we need to write.
+		nBatches := bufLen/ro.MetricBatchSize + 1
+		batchSize := ro.MetricBatchSize
+
+		for i := 0; i < nBatches; i++ {
+			// If it's the last batch, only grab the metrics that have not had
+			// a write attempt already (this is primarily to preserve order).
+			if i == nBatches-1 {
+				batchSize = bufLen % ro.MetricBatchSize
+			}
+			batch := ro.failMetrics.Batch(batchSize)
+			// If we've already failed previous writes, don't bother trying to
+			// write to this output again. We are not exiting the loop just so
+			// that we can rotate the metrics to preserve order.
+			if err == nil {
+				err = ro.write(batch)
+			}
+			if err != nil {
+				ro.failMetrics.Add(batch...)
+			}
 		}
 	}
 
-	err := ro.write(ro.metrics)
+	batch := ro.metrics.Batch(ro.MetricBatchSize)
+	// see comment above about not trying to write to an already failed output.
+	// if ro.failMetrics is empty then err will always be nil at this point.
+	if err == nil {
+		err = ro.write(batch)
+	}
 	if err != nil {
+		ro.failMetrics.Add(batch...)
 		return err
-	} else {
-		ro.metrics = make([]telegraf.Metric, 0)
 	}
-
 	return nil
 }
 
@@ -165,8 +146,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
 	elapsed := time.Since(start)
 	if err == nil {
 		if !ro.Quiet {
-			log.Printf("Wrote %d metrics to output %s in %s\n",
-				len(metrics), ro.Name, elapsed)
+			log.Printf("Output [%s] wrote batch of %d metrics in %s\n",
+				ro.Name, len(metrics), elapsed)
 		}
 	}
 	return err
diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go
index ca7034b6..d9238c5a 100644
--- a/internal/models/running_output_test.go
+++ b/internal/models/running_output_test.go
@@ -2,7 +2,6 @@ package internal_models
 
 import (
 	"fmt"
-	"sort"
 	"sync"
 	"testing"
 
@@ -29,6 +28,62 @@ var next5 = []telegraf.Metric{
 	testutil.TestMetric(101, "metric10"),
 }
 
+// Benchmark adding metrics.
+func BenchmarkRunningOutputAddWrite(b *testing.B) {
+	conf := &OutputConfig{
+		Filter: Filter{
+			IsActive: false,
+		},
+	}
+
+	m := &perfOutput{}
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
+	ro.Quiet = true
+
+	for n := 0; n < b.N; n++ {
+		ro.AddMetric(first5[0])
+		ro.Write()
+	}
+}
+
+// Benchmark adding metrics.
+func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) {
+	conf := &OutputConfig{
+		Filter: Filter{
+			IsActive: false,
+		},
+	}
+
+	m := &perfOutput{}
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
+	ro.Quiet = true
+
+	for n := 0; n < b.N; n++ {
+		ro.AddMetric(first5[0])
+		if n%100 == 0 {
+			ro.Write()
+		}
+	}
+}
+
+// Benchmark adding metrics.
+func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
+	conf := &OutputConfig{
+		Filter: Filter{
+			IsActive: false,
+		},
+	}
+
+	m := &perfOutput{}
+	m.failWrite = true
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
+	ro.Quiet = true
+
+	for n := 0; n < b.N; n++ {
+		ro.AddMetric(first5[0])
+	}
+}
+
 // Test that NameDrop filters ger properly applied.
 func TestRunningOutput_DropFilter(t *testing.T) {
 	conf := &OutputConfig{
@@ -40,7 +95,7 @@ func TestRunningOutput_DropFilter(t *testing.T) {
 	assert.NoError(t, conf.Filter.CompileFilter())
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
 
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
@@ -66,7 +121,7 @@ func TestRunningOutput_PassFilter(t *testing.T) {
 	assert.NoError(t, conf.Filter.CompileFilter())
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
 
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
@@ -92,7 +147,7 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
 	assert.NoError(t, conf.Filter.CompileFilter())
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
 
 	ro.AddMetric(first5[0])
 	assert.Len(t, m.Metrics(), 0)
@@ -114,7 +169,7 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) {
 	assert.NoError(t, conf.Filter.CompileFilter())
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
 
 	ro.AddMetric(first5[0])
 	assert.Len(t, m.Metrics(), 0)
@@ -136,7 +191,7 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
 	assert.NoError(t, conf.Filter.CompileFilter())
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
 
 	ro.AddMetric(first5[0])
 	assert.Len(t, m.Metrics(), 0)
@@ -158,7 +213,7 @@ func TestRunningOutput_TagIncludeMatch(t *testing.T) {
 	assert.NoError(t, conf.Filter.CompileFilter())
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
 
 	ro.AddMetric(first5[0])
 	assert.Len(t, m.Metrics(), 0)
@@ -178,7 +233,7 @@ func TestRunningOutputDefault(t *testing.T) {
 	}
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
+	ro := NewRunningOutput("test", m, conf, 1000, 10000)
 
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
@@ -193,8 +248,9 @@ func TestRunningOutputDefault(t *testing.T) {
 	assert.Len(t, m.Metrics(), 10)
 }
 
-// Test that the first metrics batch gets overwritten if there is a buffer overflow.
-func TestRunningOutputOverwrite(t *testing.T) {
+// Test that running output doesn't flush until it's full when
+// FlushBufferWhenFull is set.
+func TestRunningOutputFlushWhenFull(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
 			IsActive: false,
@@ -202,33 +258,51 @@ func TestRunningOutputOverwrite(t *testing.T) {
 	}
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
-	ro.MetricBatchSize = 1
-	ro.MetricBufferLimit = 4
+	ro := NewRunningOutput("test", m, conf, 6, 10)
 
+	// Fill buffer to 1 under limit
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
 	}
-	require.Len(t, m.Metrics(), 0)
+	// no flush yet
+	assert.Len(t, m.Metrics(), 0)
 
+	// add one more metric
+	ro.AddMetric(next5[0])
+	// now it flushed
+	assert.Len(t, m.Metrics(), 6)
+
+	// add one more metric and write it manually
+	ro.AddMetric(next5[1])
 	err := ro.Write()
-	require.NoError(t, err)
-	require.Len(t, m.Metrics(), 4)
+	assert.NoError(t, err)
+	assert.Len(t, m.Metrics(), 7)
+}
 
-	var expected, actual []string
-	for i, exp := range first5[1:] {
-		expected = append(expected, exp.String())
-		actual = append(actual, m.Metrics()[i].String())
+// Test that running output doesn't flush until it's full when
+// FlushBufferWhenFull is set, twice.
+func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
+	conf := &OutputConfig{
+		Filter: Filter{
+			IsActive: false,
+		},
 	}
 
-	sort.Strings(expected)
-	sort.Strings(actual)
+	m := &mockOutput{}
+	ro := NewRunningOutput("test", m, conf, 4, 12)
 
-	assert.Equal(t, expected, actual)
+	// Fill buffer past limit twive
+	for _, metric := range first5 {
+		ro.AddMetric(metric)
+	}
+	for _, metric := range next5 {
+		ro.AddMetric(metric)
+	}
+	// flushed twice
+	assert.Len(t, m.Metrics(), 8)
 }
 
-// Test that multiple buffer overflows are handled properly.
-func TestRunningOutputMultiOverwrite(t *testing.T) {
+func TestRunningOutputWriteFail(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
 			IsActive: false,
@@ -236,37 +310,34 @@ func TestRunningOutputMultiOverwrite(t *testing.T) {
 	}
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
-	ro.MetricBatchSize = 1
-	ro.MetricBufferLimit = 3
+	m.failWrite = true
+	ro := NewRunningOutput("test", m, conf, 4, 12)
 
+	// Fill buffer to limit twice
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
 	}
 	for _, metric := range next5 {
 		ro.AddMetric(metric)
 	}
-	require.Len(t, m.Metrics(), 0)
+	// no successful flush yet
+	assert.Len(t, m.Metrics(), 0)
 
+	// manual write fails
 	err := ro.Write()
-	require.NoError(t, err)
-	require.Len(t, m.Metrics(), 3)
-
-	var expected, actual []string
-	for i, exp := range next5[2:] {
-		expected = append(expected, exp.String())
-		actual = append(actual, m.Metrics()[i].String())
-	}
+	require.Error(t, err)
+	// no successful flush yet
+	assert.Len(t, m.Metrics(), 0)
 
-	sort.Strings(expected)
-	sort.Strings(actual)
+	m.failWrite = false
+	err = ro.Write()
+	require.NoError(t, err)
 
-	assert.Equal(t, expected, actual)
+	assert.Len(t, m.Metrics(), 10)
 }
 
-// Test that running output doesn't flush until it's full when
-// FlushBufferWhenFull is set.
-func TestRunningOutputFlushWhenFull(t *testing.T) {
+// Verify that the order of points is preserved during a write failure.
+func TestRunningOutputWriteFailOrder(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
 			IsActive: false,
@@ -274,33 +345,39 @@ func TestRunningOutputFlushWhenFull(t *testing.T) {
 	}
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
-	ro.FlushBufferWhenFull = true
-	ro.MetricBatchSize = 5
-	ro.MetricBufferLimit = 10
+	m.failWrite = true
+	ro := NewRunningOutput("test", m, conf, 100, 1000)
 
-	// Fill buffer to limit
+	// add 5 metrics
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
 	}
-	// no flush yet
+	// no successful flush yet
 	assert.Len(t, m.Metrics(), 0)
 
-	// add one more metric
-	ro.AddMetric(next5[0])
-	// now it flushed
-	assert.Len(t, m.Metrics(), 5)
-
-	// add one more metric and write it manually
-	ro.AddMetric(next5[1])
+	// Write fails
 	err := ro.Write()
-	assert.NoError(t, err)
-	assert.Len(t, m.Metrics(), 7)
+	require.Error(t, err)
+	// no successful flush yet
+	assert.Len(t, m.Metrics(), 0)
+
+	m.failWrite = false
+	// add 5 more metrics
+	for _, metric := range next5 {
+		ro.AddMetric(metric)
+	}
+	err = ro.Write()
+	require.NoError(t, err)
+
+	// Verify that 10 metrics were written
+	assert.Len(t, m.Metrics(), 10)
+	// Verify that they are in order
+	expected := append(first5, next5...)
+	assert.Equal(t, expected, m.Metrics())
 }
 
-// Test that running output doesn't flush until it's full when
-// FlushBufferWhenFull is set, twice.
-func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
+// Verify that the order of points is preserved during many write failures.
+func TestRunningOutputWriteFailOrder2(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
 			IsActive: false,
@@ -308,23 +385,72 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
 	}
 
 	m := &mockOutput{}
-	ro := NewRunningOutput("test", m, conf)
-	ro.FlushBufferWhenFull = true
-	ro.MetricBatchSize = 4
-	ro.MetricBufferLimit = 12
+	m.failWrite = true
+	ro := NewRunningOutput("test", m, conf, 5, 100)
 
-	// Fill buffer past limit twive
+	// add 5 metrics
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
 	}
+	// Write fails
+	err := ro.Write()
+	require.Error(t, err)
+	// no successful flush yet
+	assert.Len(t, m.Metrics(), 0)
+
+	// add 5 metrics
 	for _, metric := range next5 {
 		ro.AddMetric(metric)
 	}
-	// flushed twice
-	assert.Len(t, m.Metrics(), 8)
+	// Write fails
+	err = ro.Write()
+	require.Error(t, err)
+	// no successful flush yet
+	assert.Len(t, m.Metrics(), 0)
+
+	// add 5 metrics
+	for _, metric := range first5 {
+		ro.AddMetric(metric)
+	}
+	// Write fails
+	err = ro.Write()
+	require.Error(t, err)
+	// no successful flush yet
+	assert.Len(t, m.Metrics(), 0)
+
+	// add 5 metrics
+	for _, metric := range next5 {
+		ro.AddMetric(metric)
+	}
+	// Write fails
+	err = ro.Write()
+	require.Error(t, err)
+	// no successful flush yet
+	assert.Len(t, m.Metrics(), 0)
+
+	m.failWrite = false
+	err = ro.Write()
+	require.NoError(t, err)
+
+	// Verify that 10 metrics were written
+	assert.Len(t, m.Metrics(), 20)
+	// Verify that they are in order
+	expected := append(first5, next5...)
+	expected = append(expected, first5...)
+	expected = append(expected, next5...)
+	assert.Equal(t, expected, m.Metrics())
 }
 
-func TestRunningOutputWriteFail(t *testing.T) {
+// Verify that the order of points is preserved when there is a remainder
+// of points for the batch.
+//
+// ie, with a batch size of 5:
+//
+//     1 2 3 4 5 6 <-- order, failed points
+//     6 1 2 3 4 5 <-- order, after 1st write failure (1 2 3 4 5 was batch)
+//     1 2 3 4 5 6 <-- order, after 2nd write failure, (6 was batch)
+//
+func TestRunningOutputWriteFailOrder3(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
 			IsActive: false,
@@ -333,32 +459,36 @@ func TestRunningOutputWriteFail(t *testing.T) {
 
 	m := &mockOutput{}
 	m.failWrite = true
-	ro := NewRunningOutput("test", m, conf)
-	ro.FlushBufferWhenFull = true
-	ro.MetricBatchSize = 4
-	ro.MetricBufferLimit = 12
+	ro := NewRunningOutput("test", m, conf, 5, 1000)
 
-	// Fill buffer past limit twice
+	// add 5 metrics
 	for _, metric := range first5 {
 		ro.AddMetric(metric)
 	}
-	for _, metric := range next5 {
-		ro.AddMetric(metric)
-	}
 	// no successful flush yet
 	assert.Len(t, m.Metrics(), 0)
 
-	// manual write fails
+	// Write fails
 	err := ro.Write()
 	require.Error(t, err)
 	// no successful flush yet
 	assert.Len(t, m.Metrics(), 0)
 
+	// add and attempt to write a single metric:
+	ro.AddMetric(next5[0])
+	err = ro.Write()
+	require.Error(t, err)
+
+	// unset fail and write metrics
 	m.failWrite = false
 	err = ro.Write()
 	require.NoError(t, err)
 
-	assert.Len(t, m.Metrics(), 10)
+	// Verify that 6 metrics were written
+	assert.Len(t, m.Metrics(), 6)
+	// Verify that they are in order
+	expected := append(first5, next5[0])
+	assert.Equal(t, expected, m.Metrics())
 }
 
 type mockOutput struct {
@@ -408,3 +538,31 @@ func (m *mockOutput) Metrics() []telegraf.Metric {
 	defer m.Unlock()
 	return m.metrics
 }
+
+type perfOutput struct {
+	// if true, mock a write failure
+	failWrite bool
+}
+
+func (m *perfOutput) Connect() error {
+	return nil
+}
+
+func (m *perfOutput) Close() error {
+	return nil
+}
+
+func (m *perfOutput) Description() string {
+	return ""
+}
+
+func (m *perfOutput) SampleConfig() string {
+	return ""
+}
+
+func (m *perfOutput) Write(metrics []telegraf.Metric) error {
+	if m.failWrite {
+		return fmt.Errorf("Failed Write!")
+	}
+	return nil
+}
-- 
GitLab