From e68f251df7316d98758e35d63300d26e5c05de40 Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <phemmer@users.noreply.github.com>
Date: Mon, 25 Jul 2016 08:09:49 -0400
Subject: [PATCH] add AddError method to accumulator (#1536)

---
 accumulator.go            |  2 ++
 agent/accumulator.go      | 14 ++++++++++++++
 agent/accumulator_test.go | 28 ++++++++++++++++++++++++++++
 agent/agent.go            |  3 +++
 testutil/accumulator.go   | 11 +++++++++++
 5 files changed, 58 insertions(+)

diff --git a/accumulator.go b/accumulator.go
index 15c5485f..1fdba8f9 100644
--- a/accumulator.go
+++ b/accumulator.go
@@ -16,6 +16,8 @@ type Accumulator interface {
 		tags map[string]string,
 		t ...time.Time)
 
+	AddError(err error)
+
 	Debug() bool
 	SetDebug(enabled bool)
 
diff --git a/agent/accumulator.go b/agent/accumulator.go
index 8b0987c4..d80affe6 100644
--- a/agent/accumulator.go
+++ b/agent/accumulator.go
@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"log"
 	"math"
+	"sync/atomic"
 	"time"
 
 	"github.com/influxdata/telegraf"
@@ -33,6 +34,8 @@ type accumulator struct {
 	inputConfig *internal_models.InputConfig
 
 	precision time.Duration
+
+	errCount uint64
 }
 
 func (ac *accumulator) Add(
@@ -155,6 +158,17 @@ func (ac *accumulator) AddFields(
 	ac.metrics <- m
 }
 
+// AddError passes a runtime error to the accumulator.
+// The error will be tagged with the plugin name and written to the log.
+func (ac *accumulator) AddError(err error) {
+	if err == nil {
+		return
+	}
+	atomic.AddUint64(&ac.errCount, 1)
+	//TODO suppress/throttle consecutive duplicate errors?
+	log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
+}
+
 func (ac *accumulator) Debug() bool {
 	return ac.debug
 }
diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go
index 9bf68119..8618d327 100644
--- a/agent/accumulator_test.go
+++ b/agent/accumulator_test.go
@@ -1,8 +1,11 @@
 package agent
 
 import (
+	"bytes"
 	"fmt"
+	"log"
 	"math"
+	"os"
 	"testing"
 	"time"
 
@@ -10,6 +13,7 @@ import (
 	"github.com/influxdata/telegraf/internal/models"
 
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 )
 
 func TestAdd(t *testing.T) {
@@ -454,3 +458,27 @@ func TestAccFilterTags(t *testing.T) {
 		fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
 		actual)
 }
+
+func TestAccAddError(t *testing.T) {
+	errBuf := bytes.NewBuffer(nil)
+	log.SetOutput(errBuf)
+	defer log.SetOutput(os.Stderr)
+
+	a := accumulator{}
+	a.inputConfig = &internal_models.InputConfig{}
+	a.inputConfig.Name = "mock_plugin"
+
+	a.AddError(fmt.Errorf("foo"))
+	a.AddError(fmt.Errorf("bar"))
+	a.AddError(fmt.Errorf("baz"))
+
+	errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
+	assert.EqualValues(t, 3, a.errCount)
+	require.Len(t, errs, 4) // 4 because of trailing newline
+	assert.Contains(t, string(errs[0]), "mock_plugin")
+	assert.Contains(t, string(errs[0]), "foo")
+	assert.Contains(t, string(errs[1]), "mock_plugin")
+	assert.Contains(t, string(errs[1]), "bar")
+	assert.Contains(t, string(errs[2]), "mock_plugin")
+	assert.Contains(t, string(errs[2]), "baz")
+}
diff --git a/agent/agent.go b/agent/agent.go
index ae520b89..5ee73512 100644
--- a/agent/agent.go
+++ b/agent/agent.go
@@ -215,6 +215,9 @@ func (a *Agent) Test() error {
 		if err := input.Input.Gather(acc); err != nil {
 			return err
 		}
+		if acc.errCount > 0 {
+			return fmt.Errorf("Errors encountered during processing")
+		}
 
 		// Special instructions for some inputs. cpu, for example, needs to be
 		// run twice in order to return cpu usage percentages.
diff --git a/testutil/accumulator.go b/testutil/accumulator.go
index 1058faf8..598aa315 100644
--- a/testutil/accumulator.go
+++ b/testutil/accumulator.go
@@ -28,6 +28,7 @@ type Accumulator struct {
 	sync.Mutex
 
 	Metrics []*Metric
+	Errors  []error
 	debug   bool
 }
 
@@ -84,6 +85,16 @@ func (a *Accumulator) AddFields(
 	a.Metrics = append(a.Metrics, p)
 }
 
+// AddError appends the given error to Accumulator.Errors.
+func (a *Accumulator) AddError(err error) {
+	if err == nil {
+		return
+	}
+	a.Lock()
+	a.Errors = append(a.Errors, err)
+	a.Unlock()
+}
+
 func (a *Accumulator) SetPrecision(precision, interval time.Duration) {
 	return
 }
-- 
GitLab