From 64a71263a12c89d85bb58fc4a3eefcc88e7079e4 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Thu, 8 Sep 2016 15:22:10 +0100
Subject: [PATCH] Support Processor & Aggregator Plugins

closes #1726
---
 accumulator.go                                |  12 +-
 agent/accumulator.go                          | 179 +-----
 agent/accumulator_test.go                     | 513 ++++--------------
 agent/agent.go                                | 116 ++--
 aggregator.go                                 |  16 +
 cmd/telegraf/telegraf.go                      |   9 +-
 internal/config/config.go                     | 253 ++++++++-
 internal/models/filter.go                     |   2 +-
 internal/models/makemetric.go                 | 150 +++++
 internal/models/running_aggregator.go         |  79 +++
 internal/models/running_aggregator_test.go    | 150 +++++
 internal/models/running_input.go              |  62 ++-
 internal/models/running_input_test.go         | 326 +++++++++++
 internal/models/running_output_test.go        |   4 -
 internal/models/running_processor.go          |  37 ++
 internal/models/running_processor_test.go     | 117 ++++
 metric.go                                     |  34 +-
 plugins/aggregators/all/all.go                |   5 +
 plugins/aggregators/minmax/minmax.go          | 192 +++++++
 plugins/aggregators/minmax/minmax_test.go     |  51 ++
 plugins/aggregators/registry.go               |  11 +
 .../http_listener/http_listener_test.go       |   6 +-
 .../postgresql_extensible.go                  |   1 +
 plugins/processors/all/all.go                 |   5 +
 plugins/processors/printer/printer.go         |  35 ++
 plugins/processors/printer/printer_test.go    |   0
 plugins/processors/registry.go                |  11 +
 processor.go                                  |  12 +
 28 files changed, 1783 insertions(+), 605 deletions(-)
 create mode 100644 aggregator.go
 create mode 100644 internal/models/makemetric.go
 create mode 100644 internal/models/running_aggregator.go
 create mode 100644 internal/models/running_aggregator_test.go
 create mode 100644 internal/models/running_input_test.go
 create mode 100644 internal/models/running_processor.go
 create mode 100644 internal/models/running_processor_test.go
 create mode 100644 plugins/aggregators/all/all.go
 create mode 100644 plugins/aggregators/minmax/minmax.go
 create mode 100644 plugins/aggregators/minmax/minmax_test.go
 create mode 100644 plugins/aggregators/registry.go
 create mode 100644 plugins/processors/all/all.go
 create mode 100644 plugins/processors/printer/printer.go
 create mode 100644 plugins/processors/printer/printer_test.go
 create mode 100644 plugins/processors/registry.go
 create mode 100644 processor.go

diff --git a/accumulator.go b/accumulator.go
index bb6e4dc8..13fd6e57 100644
--- a/accumulator.go
+++ b/accumulator.go
@@ -2,9 +2,8 @@ package telegraf
 
 import "time"
 
-// Accumulator is an interface for "accumulating" metrics from input plugin(s).
-// The metrics are sent down a channel shared between all input plugins and then
-// flushed on the configured flush_interval.
+// Accumulator is an interface for "accumulating" metrics from plugin(s).
+// The metrics are sent down a channel shared between all plugins.
 type Accumulator interface {
 	// AddFields adds a metric to the accumulator with the given measurement
 	// name, fields, and tags (and timestamp). If a timestamp is not provided,
@@ -29,12 +28,7 @@ type Accumulator interface {
 		tags map[string]string,
 		t ...time.Time)
 
-	AddError(err error)
-
-	Debug() bool
-	SetDebug(enabled bool)
-
 	SetPrecision(precision, interval time.Duration)
 
-	DisablePrecision()
+	AddError(err error)
 }
diff --git a/agent/accumulator.go b/agent/accumulator.go
index 752e2b91..0a84cedd 100644
--- a/agent/accumulator.go
+++ b/agent/accumulator.go
@@ -1,37 +1,40 @@
 package agent
 
 import (
-	"fmt"
 	"log"
-	"math"
 	"sync/atomic"
 	"time"
 
 	"github.com/influxdata/telegraf"
-	"github.com/influxdata/telegraf/internal/models"
 )
 
+type MetricMaker interface {
+	Name() string
+	MakeMetric(
+		measurement string,
+		fields map[string]interface{},
+		tags map[string]string,
+		mType telegraf.ValueType,
+		t time.Time,
+	) telegraf.Metric
+}
+
 func NewAccumulator(
-	inputConfig *models.InputConfig,
+	maker MetricMaker,
 	metrics chan telegraf.Metric,
 ) *accumulator {
-	acc := accumulator{}
-	acc.metrics = metrics
-	acc.inputConfig = inputConfig
-	acc.precision = time.Nanosecond
+	acc := accumulator{
+		maker:     maker,
+		metrics:   metrics,
+		precision: time.Nanosecond,
+	}
 	return &acc
 }
 
 type accumulator struct {
 	metrics chan telegraf.Metric
 
-	defaultTags map[string]string
-
-	debug bool
-	// print every point added to the accumulator
-	trace bool
-
-	inputConfig *models.InputConfig
+	maker MetricMaker
 
 	precision time.Duration
 
@@ -44,7 +47,7 @@ func (ac *accumulator) AddFields(
 	tags map[string]string,
 	t ...time.Time,
 ) {
-	if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil {
+	if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Untyped, ac.getTime(t)); m != nil {
 		ac.metrics <- m
 	}
 }
@@ -55,7 +58,7 @@ func (ac *accumulator) AddGauge(
 	tags map[string]string,
 	t ...time.Time,
 ) {
-	if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil {
+	if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Gauge, ac.getTime(t)); m != nil {
 		ac.metrics <- m
 	}
 }
@@ -66,114 +69,11 @@ func (ac *accumulator) AddCounter(
 	tags map[string]string,
 	t ...time.Time,
 ) {
-	if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil {
+	if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Counter, ac.getTime(t)); m != nil {
 		ac.metrics <- m
 	}
-}
-
-// makeMetric either returns a metric, or returns nil if the metric doesn't
-// need to be created (because of filtering, an error, etc.)
-func (ac *accumulator) makeMetric(
-	measurement string,
-	fields map[string]interface{},
-	tags map[string]string,
-	mType telegraf.ValueType,
-	t ...time.Time,
-) telegraf.Metric {
-	if len(fields) == 0 || len(measurement) == 0 {
-		return nil
-	}
-	if tags == nil {
-		tags = make(map[string]string)
-	}
-
-	// Override measurement name if set
-	if len(ac.inputConfig.NameOverride) != 0 {
-		measurement = ac.inputConfig.NameOverride
-	}
-	// Apply measurement prefix and suffix if set
-	if len(ac.inputConfig.MeasurementPrefix) != 0 {
-		measurement = ac.inputConfig.MeasurementPrefix + measurement
-	}
-	if len(ac.inputConfig.MeasurementSuffix) != 0 {
-		measurement = measurement + ac.inputConfig.MeasurementSuffix
-	}
-
-	// Apply plugin-wide tags if set
-	for k, v := range ac.inputConfig.Tags {
-		if _, ok := tags[k]; !ok {
-			tags[k] = v
-		}
-	}
-	// Apply daemon-wide tags if set
-	for k, v := range ac.defaultTags {
-		if _, ok := tags[k]; !ok {
-			tags[k] = v
-		}
-	}
-
-	// Apply the metric filter(s)
-	if ok := ac.inputConfig.Filter.Apply(measurement, fields, tags); !ok {
-		return nil
-	}
-
-	for k, v := range fields {
-		// Validate uint64 and float64 fields
-		switch val := v.(type) {
-		case uint64:
-			// InfluxDB does not support writing uint64
-			if val < uint64(9223372036854775808) {
-				fields[k] = int64(val)
-			} else {
-				fields[k] = int64(9223372036854775807)
-			}
-			continue
-		case float64:
-			// NaNs are invalid values in influxdb, skip measurement
-			if math.IsNaN(val) || math.IsInf(val, 0) {
-				if ac.debug {
-					log.Printf("I! Measurement [%s] field [%s] has a NaN or Inf "+
-						"field, skipping",
-						measurement, k)
-				}
-				delete(fields, k)
-				continue
-			}
-		}
-
-		fields[k] = v
-	}
-
-	var timestamp time.Time
-	if len(t) > 0 {
-		timestamp = t[0]
-	} else {
-		timestamp = time.Now()
-	}
-	timestamp = timestamp.Round(ac.precision)
-
-	var m telegraf.Metric
-	var err error
-	switch mType {
-	case telegraf.Counter:
-		m, err = telegraf.NewCounterMetric(measurement, tags, fields, timestamp)
-	case telegraf.Gauge:
-		m, err = telegraf.NewGaugeMetric(measurement, tags, fields, timestamp)
-	default:
-		m, err = telegraf.NewMetric(measurement, tags, fields, timestamp)
-	}
-	if err != nil {
-		log.Printf("E! Error adding point [%s]: %s\n", measurement, err.Error())
-		return nil
 	}
 
-	if ac.trace {
-		fmt.Println("> " + m.String())
-	}
-
-	return m
-}
-
 // AddError passes a runtime error to the accumulator.
 // The error will be tagged with the plugin name and written to the log.
 func (ac *accumulator) AddError(err error) {
@@ -182,23 +82,7 @@ func (ac *accumulator) AddError(err error) {
 	}
 	atomic.AddUint64(&ac.errCount, 1)
 	//TODO suppress/throttle consecutive duplicate errors?
-	log.Printf("E! Error in input [%s]: %s", ac.inputConfig.Name, err)
-}
-
-func (ac *accumulator) Debug() bool {
-	return ac.debug
-}
-
-func (ac *accumulator) SetDebug(debug bool) {
-	ac.debug = debug
-}
-
-func (ac *accumulator) Trace() bool {
-	return ac.trace
-}
-
-func (ac *accumulator) SetTrace(trace bool) {
-	ac.trace = trace
+	log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
 }
 
 // SetPrecision takes two time.Duration objects. If the first is non-zero,
@@ -222,17 +106,12 @@ func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
 	}
 }
 
-func (ac *accumulator) DisablePrecision() {
-	ac.precision = time.Nanosecond
-}
-
-func (ac *accumulator) setDefaultTags(tags map[string]string) {
-	ac.defaultTags = tags
+func (ac accumulator) getTime(t []time.Time) time.Time {
+	var timestamp time.Time
+	if len(t) > 0 {
+		timestamp = t[0]
+	} else {
+		timestamp = time.Now()
+	return timestamp.Round(ac.precision)
 }
-
-func (ac *accumulator) addDefaultTag(key, value string) {
-	if ac.defaultTags == nil {
-		ac.defaultTags = make(map[string]string)
-	}
-	ac.defaultTags[key] = value
 }
diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go
index ef5a34ec..ef8d9eb2 100644
--- a/agent/accumulator_test.go
+++ b/agent/accumulator_test.go
@@ -4,24 +4,21 @@ import (
 	"bytes"
 	"fmt"
 	"log"
-	"math"
 	"os"
 	"testing"
 	"time"
 
 	"github.com/influxdata/telegraf"
-	"github.com/influxdata/telegraf/internal/models"
 
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
 
 func TestAdd(t *testing.T) {
-	a := accumulator{}
 	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
 	a.AddFields("acctest",
 		map[string]interface{}{"value": float64(101)},
@@ -33,99 +30,80 @@ func TestAdd(t *testing.T) {
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{"acc": "test"}, now)
 
-	testm := <-a.metrics
+	testm := <-metrics
 	actual := testm.String()
 	assert.Contains(t, actual, "acctest value=101")
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
 	assert.Contains(t, actual, "acctest,acc=test value=101")
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
 	assert.Equal(t,
 		fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
 		actual)
 }
 
-func TestAddGauge(t *testing.T) {
-	a := accumulator{}
+func TestAddFields(t *testing.T) {
 	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
-	a.AddGauge("acctest",
-		map[string]interface{}{"value": float64(101)},
-		map[string]string{})
-	a.AddGauge("acctest",
-		map[string]interface{}{"value": float64(101)},
-		map[string]string{"acc": "test"})
-	a.AddGauge("acctest",
-		map[string]interface{}{"value": float64(101)},
-		map[string]string{"acc": "test"}, now)
+	fields := map[string]interface{}{
+		"usage": float64(99),
+	}
+	a.AddFields("acctest", fields, map[string]string{})
+	a.AddGauge("acctest", fields, map[string]string{"acc": "test"})
+	a.AddCounter("acctest", fields, map[string]string{"acc": "test"}, now)
 
-	testm := <-a.metrics
+	testm := <-metrics
 	actual := testm.String()
-	assert.Contains(t, actual, "acctest value=101")
-	assert.Equal(t, testm.Type(), telegraf.Gauge)
+	assert.Contains(t, actual, "acctest usage=99")
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
-	assert.Contains(t, actual, "acctest,acc=test value=101")
-	assert.Equal(t, testm.Type(), telegraf.Gauge)
+	assert.Contains(t, actual, "acctest,acc=test usage=99")
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
 	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
+		fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()),
 		actual)
-	assert.Equal(t, testm.Type(), telegraf.Gauge)
 }
 
-func TestAddCounter(t *testing.T) {
-	a := accumulator{}
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	a.AddCounter("acctest",
-		map[string]interface{}{"value": float64(101)},
-		map[string]string{})
-	a.AddCounter("acctest",
-		map[string]interface{}{"value": float64(101)},
-		map[string]string{"acc": "test"})
-	a.AddCounter("acctest",
-		map[string]interface{}{"value": float64(101)},
-		map[string]string{"acc": "test"}, now)
+func TestAccAddError(t *testing.T) {
+	errBuf := bytes.NewBuffer(nil)
+	log.SetOutput(errBuf)
+	defer log.SetOutput(os.Stderr)
 
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest value=101")
-	assert.Equal(t, testm.Type(), telegraf.Counter)
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Contains(t, actual, "acctest,acc=test value=101")
-	assert.Equal(t, testm.Type(), telegraf.Counter)
+	a.AddError(fmt.Errorf("foo"))
+	a.AddError(fmt.Errorf("bar"))
+	a.AddError(fmt.Errorf("baz"))
 
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
-		actual)
-	assert.Equal(t, testm.Type(), telegraf.Counter)
+	errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
+	assert.EqualValues(t, 3, a.errCount)
+	require.Len(t, errs, 4) // 4 because of trailing newline
+	assert.Contains(t, string(errs[0]), "TestPlugin")
+	assert.Contains(t, string(errs[0]), "foo")
+	assert.Contains(t, string(errs[1]), "TestPlugin")
+	assert.Contains(t, string(errs[1]), "bar")
+	assert.Contains(t, string(errs[2]), "TestPlugin")
+	assert.Contains(t, string(errs[2]), "baz")
 }
 
-func TestAddNoPrecisionWithInterval(t *testing.T) {
-	a := accumulator{}
+func TestAddNoIntervalWithPrecision(t *testing.T) {
 	now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 	a.SetPrecision(0, time.Second)
+
 	a.AddFields("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{})
@@ -151,14 +129,13 @@ func TestAddNoPrecisionWithInterval(t *testing.T) {
 		actual)
 }
 
-func TestAddNoIntervalWithPrecision(t *testing.T) {
-	a := accumulator{}
+func TestAddDisablePrecision(t *testing.T) {
 	now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
-	a.SetPrecision(time.Second, time.Millisecond)
+	a.SetPrecision(time.Nanosecond, 0)
 	a.AddFields("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{})
@@ -180,19 +157,17 @@ func TestAddNoIntervalWithPrecision(t *testing.T) {
 	testm = <-a.metrics
 	actual = testm.String()
 	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)),
+		fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)),
 		actual)
 }
 
-func TestAddDisablePrecision(t *testing.T) {
-	a := accumulator{}
+func TestAddNoPrecisionWithInterval(t *testing.T) {
 	now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
-	a.SetPrecision(time.Second, time.Millisecond)
-	a.DisablePrecision()
+	a.SetPrecision(0, time.Second)
 	a.AddFields("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{})
@@ -214,16 +189,15 @@ func TestAddDisablePrecision(t *testing.T) {
 	testm = <-a.metrics
 	actual = testm.String()
 	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)),
+		fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)),
 		actual)
 }
 
 func TestDifferentPrecisions(t *testing.T) {
-	a := accumulator{}
 	now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
 	a.SetPrecision(0, time.Second)
 	a.AddFields("acctest",
@@ -266,349 +240,100 @@ func TestDifferentPrecisions(t *testing.T) {
 		actual)
 }
 
-func TestAddDefaultTags(t *testing.T) {
-	a := accumulator{}
-	a.addDefaultTag("default", "tag")
+func TestAddGauge(t *testing.T) {
 	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
-	a.AddFields("acctest",
+	a.AddGauge("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{})
-	a.AddFields("acctest",
-		map[string]interface{}{"value": float64(101)},
-		map[string]string{"acc": "test"})
-	a.AddFields("acctest",
+	a.AddGauge("acctest",
 		map[string]interface{}{"value": float64(101)},
-		map[string]string{"acc": "test"}, now)
-
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest,default=tag value=101")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Contains(t, actual, "acctest,acc=test,default=tag value=101")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()),
-		actual)
-}
-
-func TestAddFields(t *testing.T) {
-	a := accumulator{}
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	fields := map[string]interface{}{
-		"usage": float64(99),
-	}
-	a.AddFields("acctest", fields, map[string]string{})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
-
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest usage=99")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Contains(t, actual, "acctest,acc=test usage=99")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()),
-		actual)
-}
-
-// Test that all Inf fields get dropped, and not added to metrics channel
-func TestAddInfFields(t *testing.T) {
-	inf := math.Inf(1)
-	ninf := math.Inf(-1)
-
-	a := accumulator{}
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	fields := map[string]interface{}{
-		"usage":  inf,
-		"nusage": ninf,
-	}
-	a.AddFields("acctest", fields, map[string]string{})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
-
-	assert.Len(t, a.metrics, 0)
-
-	// test that non-inf fields are kept and not dropped
-	fields["notinf"] = float64(100)
-	a.AddFields("acctest", fields, map[string]string{})
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest notinf=100")
-}
-
-// Test that nan fields are dropped and not added
-func TestAddNaNFields(t *testing.T) {
-	nan := math.NaN()
-
-	a := accumulator{}
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	fields := map[string]interface{}{
-		"usage": nan,
-	}
-	a.AddFields("acctest", fields, map[string]string{})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
-
-	assert.Len(t, a.metrics, 0)
-
-	// test that non-nan fields are kept and not dropped
-	fields["notnan"] = float64(100)
-	a.AddFields("acctest", fields, map[string]string{})
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest notnan=100")
-}
-
-func TestAddUint64Fields(t *testing.T) {
-	a := accumulator{}
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	fields := map[string]interface{}{
-		"usage": uint64(99),
-	}
-	a.AddFields("acctest", fields, map[string]string{})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
-
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest usage=99i")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Contains(t, actual, "acctest,acc=test usage=99i")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test usage=99i %d", now.UnixNano()),
-		actual)
-}
-
-func TestAddUint64Overflow(t *testing.T) {
-	a := accumulator{}
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	fields := map[string]interface{}{
-		"usage": uint64(9223372036854775808),
-	}
-	a.AddFields("acctest", fields, map[string]string{})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"})
-	a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
-
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest usage=9223372036854775807i")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Contains(t, actual, "acctest,acc=test usage=9223372036854775807i")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test usage=9223372036854775807i %d", now.UnixNano()),
-		actual)
-}
-
-func TestAddInts(t *testing.T) {
-	a := accumulator{}
-	a.addDefaultTag("default", "tag")
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	a.AddFields("acctest",
-		map[string]interface{}{"value": int(101)},
-		map[string]string{})
-	a.AddFields("acctest",
-		map[string]interface{}{"value": int32(101)},
-		map[string]string{"acc": "test"})
-	a.AddFields("acctest",
-		map[string]interface{}{"value": int64(101)},
-		map[string]string{"acc": "test"}, now)
-
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest,default=tag value=101i")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Contains(t, actual, "acctest,acc=test,default=tag value=101i")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test,default=tag value=101i %d", now.UnixNano()),
-		actual)
-}
-
-func TestAddFloats(t *testing.T) {
-	a := accumulator{}
-	a.addDefaultTag("default", "tag")
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	a.AddFields("acctest",
-		map[string]interface{}{"value": float32(101)},
 		map[string]string{"acc": "test"})
-	a.AddFields("acctest",
+	a.AddGauge("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{"acc": "test"}, now)
 
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest,acc=test,default=tag value=101")
-
-	testm = <-a.metrics
-	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()),
-		actual)
-}
-
-func TestAddStrings(t *testing.T) {
-	a := accumulator{}
-	a.addDefaultTag("default", "tag")
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	a.AddFields("acctest",
-		map[string]interface{}{"value": "test"},
-		map[string]string{"acc": "test"})
-	a.AddFields("acctest",
-		map[string]interface{}{"value": "foo"},
-		map[string]string{"acc": "test"}, now)
-
-	testm := <-a.metrics
+	testm := <-metrics
 	actual := testm.String()
-	assert.Contains(t, actual, "acctest,acc=test,default=tag value=\"test\"")
+	assert.Contains(t, actual, "acctest value=101")
+	assert.Equal(t, testm.Type(), telegraf.Gauge)
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
-	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test,default=tag value=\"foo\" %d", now.UnixNano()),
-		actual)
-}
-
-func TestAddBools(t *testing.T) {
-	a := accumulator{}
-	a.addDefaultTag("default", "tag")
-	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	a.inputConfig = &models.InputConfig{}
-
-	a.AddFields("acctest",
-		map[string]interface{}{"value": true}, map[string]string{"acc": "test"})
-	a.AddFields("acctest",
-		map[string]interface{}{"value": false}, map[string]string{"acc": "test"}, now)
-
-	testm := <-a.metrics
-	actual := testm.String()
-	assert.Contains(t, actual, "acctest,acc=test,default=tag value=true")
+	assert.Contains(t, actual, "acctest,acc=test value=101")
+	assert.Equal(t, testm.Type(), telegraf.Gauge)
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
 	assert.Equal(t,
-		fmt.Sprintf("acctest,acc=test,default=tag value=false %d", now.UnixNano()),
+		fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
 		actual)
+	assert.Equal(t, testm.Type(), telegraf.Gauge)
 }
 
-// Test that tag filters get applied to metrics.
-func TestAccFilterTags(t *testing.T) {
-	a := accumulator{}
+func TestAddCounter(t *testing.T) {
 	now := time.Now()
-	a.metrics = make(chan telegraf.Metric, 10)
-	defer close(a.metrics)
-	filter := models.Filter{
-		TagExclude: []string{"acc"},
-	}
-	assert.NoError(t, filter.Compile())
-	a.inputConfig = &models.InputConfig{}
-	a.inputConfig.Filter = filter
+	metrics := make(chan telegraf.Metric, 10)
+	defer close(metrics)
+	a := NewAccumulator(&TestMetricMaker{}, metrics)
 
-	a.AddFields("acctest",
+	a.AddCounter("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{})
-	a.AddFields("acctest",
+	a.AddCounter("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{"acc": "test"})
-	a.AddFields("acctest",
+	a.AddCounter("acctest",
 		map[string]interface{}{"value": float64(101)},
 		map[string]string{"acc": "test"}, now)
 
-	testm := <-a.metrics
+	testm := <-metrics
 	actual := testm.String()
 	assert.Contains(t, actual, "acctest value=101")
+	assert.Equal(t, testm.Type(), telegraf.Counter)
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
-	assert.Contains(t, actual, "acctest value=101")
+	assert.Contains(t, actual, "acctest,acc=test value=101")
+	assert.Equal(t, testm.Type(), telegraf.Counter)
 
-	testm = <-a.metrics
+	testm = <-metrics
 	actual = testm.String()
 	assert.Equal(t,
-		fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
+		fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
 		actual)
+	assert.Equal(t, testm.Type(), telegraf.Counter)
 }
 
-func TestAccAddError(t *testing.T) {
-	errBuf := bytes.NewBuffer(nil)
-	log.SetOutput(errBuf)
-	defer log.SetOutput(os.Stderr)
-
-	a := accumulator{}
-	a.inputConfig = &models.InputConfig{}
-	a.inputConfig.Name = "mock_plugin"
-
-	a.AddError(fmt.Errorf("foo"))
-	a.AddError(fmt.Errorf("bar"))
-	a.AddError(fmt.Errorf("baz"))
+type TestMetricMaker struct {
+}
 
-	errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
-	assert.EqualValues(t, 3, a.errCount)
-	require.Len(t, errs, 4) // 4 because of trailing newline
-	assert.Contains(t, string(errs[0]), "mock_plugin")
-	assert.Contains(t, string(errs[0]), "foo")
-	assert.Contains(t, string(errs[1]), "mock_plugin")
-	assert.Contains(t, string(errs[1]), "bar")
-	assert.Contains(t, string(errs[2]), "mock_plugin")
-	assert.Contains(t, string(errs[2]), "baz")
+func (tm *TestMetricMaker) Name() string {
+	return "TestPlugin"
+}
+func (tm *TestMetricMaker) MakeMetric(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	mType telegraf.ValueType,
+	t time.Time,
+) telegraf.Metric {
+	switch mType {
+	case telegraf.Untyped:
+		if m, err := telegraf.NewMetric(measurement, tags, fields, t); err == nil {
+			return m
+		}
+	case telegraf.Counter:
+		if m, err := telegraf.NewCounterMetric(measurement, tags, fields, t); err == nil {
+			return m
+		}
+	case telegraf.Gauge:
+		if m, err := telegraf.NewGaugeMetric(measurement, tags, fields, t); err == nil {
+			return m
+		}
+	}
+	return nil
 }
diff --git a/agent/agent.go b/agent/agent.go
index 8fef8ca4..a912126b 100644
--- a/agent/agent.go
+++ b/agent/agent.go
@@ -89,7 +89,7 @@ func panicRecover(input *models.RunningInput) {
 		trace := make([]byte, 2048)
 		runtime.Stack(trace, true)
 		log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
-			input.Name, err, trace)
+			input.Name(), err, trace)
 		log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
 			"stack trace, configuration, and OS information: " +
 			"https://github.com/influxdata/telegraf/issues/new")
@@ -103,19 +103,18 @@ func (a *Agent) gatherer(
 	input *models.RunningInput,
 	interval time.Duration,
 	metricC chan telegraf.Metric,
-) error {
+) {
 	defer panicRecover(input)
 
 	ticker := time.NewTicker(interval)
 	defer ticker.Stop()
 
 	for {
-		var outerr error
-
-		acc := NewAccumulator(input.Config, metricC)
+		acc := NewAccumulator(input, metricC)
 		acc.SetPrecision(a.Config.Agent.Precision.Duration,
 			a.Config.Agent.Interval.Duration)
-		acc.setDefaultTags(a.Config.Tags)
+		input.SetDebug(a.Config.Agent.Debug)
+		input.SetDefaultTags(a.Config.Tags)
 
 		internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)
 
@@ -123,15 +122,13 @@ func (a *Agent) gatherer(
 		gatherWithTimeout(shutdown, input, acc, interval)
 		elapsed := time.Since(start)
 
-		if outerr != nil {
-			return outerr
-		}
+
 		log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n",
-			input.Name, interval, elapsed)
+			input.Name(), interval, elapsed)
 
 		select {
 		case <-shutdown:
-			return nil
+			return
 		case <-ticker.C:
 			continue
 		}
@@ -160,13 +157,13 @@ func gatherWithTimeout(
 		select {
 		case err := <-done:
 			if err != nil {
-				log.Printf("E! ERROR in input [%s]: %s", input.Name, err)
+				log.Printf("E! ERROR in input [%s]: %s", input.Name(), err)
 			}
 			return
 		case <-ticker.C:
 			log.Printf("E! ERROR: input [%s] took longer to collect than "+
 				"collection interval (%s)",
-				input.Name, timeout)
+				input.Name(), timeout)
 			continue
 		case <-shutdown:
 			return
@@ -194,13 +191,13 @@ func (a *Agent) Test() error {
 	}()
 
 	for _, input := range a.Config.Inputs {
-		acc := NewAccumulator(input.Config, metricC)
-		acc.SetTrace(true)
+		acc := NewAccumulator(input, metricC)
 		acc.SetPrecision(a.Config.Agent.Precision.Duration,
 			a.Config.Agent.Interval.Duration)
-		acc.setDefaultTags(a.Config.Tags)
+		input.SetTrace(true)
+		input.SetDefaultTags(a.Config.Tags)
 
-		fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)
+		fmt.Printf("* Plugin: %s, Collection 1\n", input.Name())
 		if input.Config.Interval != 0 {
 			fmt.Printf("* Internal: %s\n", input.Config.Interval)
 		}
@@ -214,10 +211,10 @@ func (a *Agent) Test() error {
 
 		// Special instructions for some inputs. cpu, for example, needs to be
 		// run twice in order to return cpu usage percentages.
-		switch input.Name {
+		switch input.Name() {
 		case "cpu", "mongodb", "procstat":
 			time.Sleep(500 * time.Millisecond)
-			fmt.Printf("* Plugin: %s, Collection 2\n", input.Name)
+			fmt.Printf("* Plugin: %s, Collection 2\n", input.Name())
 			if err := input.Input.Gather(acc); err != nil {
 				return err
 			}
@@ -250,26 +247,69 @@ func (a *Agent) flush() {
 func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
 	// Inelegant, but this sleep is to allow the Gather threads to run, so that
 	// the flusher will flush after metrics are collected.
-	time.Sleep(time.Millisecond * 200)
+	time.Sleep(time.Millisecond * 300)
 
-	ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
+	// create an output metric channel and a gorouting that continously passes
+	// each metric onto the output plugins & aggregators.
+	outMetricC := make(chan telegraf.Metric, 100)
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case <-shutdown:
+				for _, agg := range a.Config.Aggregators {
+					agg.Aggregator.Stop()
+				}
+				if len(outMetricC) > 0 {
+					// keep going until outMetricC is flushed
+					continue
+				}
+				return
+			case m := <-outMetricC:
+				// if dropOriginal is set to true, then we will only send this
+				// metric to the aggregators, not the outputs.
+				var dropOriginal bool
+				if !m.IsAggregate() {
+					for _, agg := range a.Config.Aggregators {
+						if ok := agg.Apply(copyMetric(m)); ok {
+							dropOriginal = true
+						}
+					}
+				}
+				if !dropOriginal {
+					for i, o := range a.Config.Outputs {
+						if i == len(a.Config.Outputs)-1 {
+							o.AddMetric(m)
+						} else {
+							o.AddMetric(copyMetric(m))
+						}
+					}
+				}
+			}
+		}
+	}()
 
+	ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
 	for {
 		select {
 		case <-shutdown:
 			log.Println("I! Hang on, flushing any cached metrics before shutdown")
+			// wait for outMetricC to get flushed before flushing outputs
+			wg.Wait()
 			a.flush()
 			return nil
 		case <-ticker.C:
 			internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
 			a.flush()
-		case m := <-metricC:
-			for i, o := range a.Config.Outputs {
-				if i == len(a.Config.Outputs)-1 {
-					o.AddMetric(m)
-				} else {
-					o.AddMetric(copyMetric(m))
+		case metric := <-metricC:
+			mS := []telegraf.Metric{metric}
+			for _, processor := range a.Config.Processors {
+				mS = processor.Apply(mS...)
 				}
+			for _, m := range mS {
+				outMetricC <- m
 			}
 		}
 	}
@@ -303,18 +343,18 @@ func (a *Agent) Run(shutdown chan struct{}) error {
 	// channel shared between all input threads for accumulating metrics
 	metricC := make(chan telegraf.Metric, 10000)
 
+	// Start all ServicePlugins
 	for _, input := range a.Config.Inputs {
-		// Start service of any ServicePlugins
 		switch p := input.Input.(type) {
 		case telegraf.ServiceInput:
-			acc := NewAccumulator(input.Config, metricC)
+			acc := NewAccumulator(input, metricC)
 			// Service input plugins should set their own precision of their
 			// metrics.
-			acc.DisablePrecision()
-			acc.setDefaultTags(a.Config.Tags)
+			acc.SetPrecision(time.Nanosecond, 0)
+			input.SetDefaultTags(a.Config.Tags)
 			if err := p.Start(acc); err != nil {
 				log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
-					input.Name, err.Error())
+					input.Name(), err.Error())
 				return err
 			}
 			defer p.Stop()
@@ -327,6 +367,18 @@ func (a *Agent) Run(shutdown chan struct{}) error {
 		time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
 	}
 
+	// Start all Aggregators
+	for _, aggregator := range a.Config.Aggregators {
+		acc := NewAccumulator(aggregator, metricC)
+		acc.SetPrecision(a.Config.Agent.Precision.Duration,
+			a.Config.Agent.Interval.Duration)
+		if err := aggregator.Aggregator.Start(acc); err != nil {
+			log.Printf("[%s] failed to start, exiting\n%s\n",
+				aggregator.Name(), err.Error())
+			return err
+		}
+	}
+
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
diff --git a/aggregator.go b/aggregator.go
new file mode 100644
index 00000000..2a881c3a
--- /dev/null
+++ b/aggregator.go
@@ -0,0 +1,16 @@
+package telegraf
+
+type Aggregator interface {
+	// SampleConfig returns the default configuration of the Input
+	SampleConfig() string
+
+	// Description returns a one-sentence description on the Input
+	Description() string
+
+	// Apply the metric to the aggregator
+	Apply(in Metric)
+
+	// Start starts the service filter with the given accumulator
+	Start(acc Accumulator) error
+	Stop()
+}
diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go
index dd8d8431..cf6d7158 100644
--- a/cmd/telegraf/telegraf.go
+++ b/cmd/telegraf/telegraf.go
@@ -12,12 +12,13 @@ import (
 
 	"github.com/influxdata/telegraf/agent"
 	"github.com/influxdata/telegraf/internal/config"
+	_ "github.com/influxdata/telegraf/plugins/aggregators/all"
 	"github.com/influxdata/telegraf/logger"
 	"github.com/influxdata/telegraf/plugins/inputs"
 	_ "github.com/influxdata/telegraf/plugins/inputs/all"
 	"github.com/influxdata/telegraf/plugins/outputs"
 	_ "github.com/influxdata/telegraf/plugins/outputs/all"
-
+	_ "github.com/influxdata/telegraf/plugins/processors/all"
 	"github.com/kardianos/service"
 )
 
@@ -111,6 +112,8 @@ Examples:
   telegraf -config telegraf.conf -input-filter cpu:mem -output-filter influxdb
 `
 
+var logger service.Logger
+
 var stop chan struct{}
 
 var srvc service.Service
@@ -306,6 +309,10 @@ func main() {
 		if err != nil {
 			log.Fatal(err)
 		}
+		logger, err = s.Logger(nil)
+		if err != nil {
+			log.Fatal(err)
+		}
 		// Handle the -service flag here to prevent any issues with tooling that
 		// may not have an interactive session, e.g. installing from Ansible.
 		if *fService != "" {
diff --git a/internal/config/config.go b/internal/config/config.go
index b76c9b52..3dc2c02e 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -11,15 +11,18 @@ import (
 	"regexp"
 	"runtime"
 	"sort"
+	"strconv"
 	"strings"
 	"time"
 
 	"github.com/influxdata/telegraf"
 	"github.com/influxdata/telegraf/internal"
 	"github.com/influxdata/telegraf/internal/models"
+	"github.com/influxdata/telegraf/plugins/aggregators"
 	"github.com/influxdata/telegraf/plugins/inputs"
 	"github.com/influxdata/telegraf/plugins/outputs"
 	"github.com/influxdata/telegraf/plugins/parsers"
+	"github.com/influxdata/telegraf/plugins/processors"
 	"github.com/influxdata/telegraf/plugins/serializers"
 
 	"github.com/influxdata/config"
@@ -47,9 +50,11 @@ type Config struct {
 	InputFilters  []string
 	OutputFilters []string
 
-	Agent   *AgentConfig
-	Inputs  []*models.RunningInput
-	Outputs []*models.RunningOutput
+	Agent       *AgentConfig
+	Inputs      []*models.RunningInput
+	Outputs     []*models.RunningOutput
+	Processors  []*models.RunningProcessor
+	Aggregators []*models.RunningAggregator
 }
 
 func NewConfig() *Config {
@@ -64,6 +69,7 @@ func NewConfig() *Config {
 		Tags:          make(map[string]string),
 		Inputs:        make([]*models.RunningInput, 0),
 		Outputs:       make([]*models.RunningOutput, 0),
+		Processors:    make([]*models.RunningProcessor, 0),
 		InputFilters:  make([]string, 0),
 		OutputFilters: make([]string, 0),
 	}
@@ -138,7 +144,7 @@ type AgentConfig struct {
 func (c *Config) InputNames() []string {
 	var name []string
 	for _, input := range c.Inputs {
-		name = append(name, input.Name)
+		name = append(name, input.Name())
 	}
 	return name
 }
@@ -248,6 +254,20 @@ var header = `# Telegraf Configuration
 ###############################################################################
 `
 
+var processorHeader = `
+
+###############################################################################
+#                            PROCESSOR PLUGINS                                #
+###############################################################################
+`
+
+var aggregatorHeader = `
+
+###############################################################################
+#                            AGGREGATOR PLUGINS                               #
+###############################################################################
+`
+
 var inputHeader = `
 
 ###############################################################################
@@ -266,6 +286,7 @@ var serviceInputHeader = `
 func PrintSampleConfig(inputFilters []string, outputFilters []string) {
 	fmt.Printf(header)
 
+	// print output plugins
 	if len(outputFilters) != 0 {
 		printFilteredOutputs(outputFilters, false)
 	} else {
@@ -281,6 +302,25 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) {
 		printFilteredOutputs(pnames, true)
 	}
 
+	// print processor plugins
+	fmt.Printf(processorHeader)
+	pnames := []string{}
+	for pname := range processors.Processors {
+		pnames = append(pnames, pname)
+	}
+	sort.Strings(pnames)
+	printFilteredProcessors(pnames, true)
+
+	// pring aggregator plugins
+	fmt.Printf(aggregatorHeader)
+	pnames = []string{}
+	for pname := range aggregators.Aggregators {
+		pnames = append(pnames, pname)
+	}
+	sort.Strings(pnames)
+	printFilteredAggregators(pnames, true)
+
+	// print input plugins
 	fmt.Printf(inputHeader)
 	if len(inputFilters) != 0 {
 		printFilteredInputs(inputFilters, false)
@@ -298,6 +338,42 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) {
 	}
 }
 
+func printFilteredProcessors(processorFilters []string, commented bool) {
+	// Filter processors
+	var pnames []string
+	for pname := range processors.Processors {
+		if sliceContains(pname, processorFilters) {
+			pnames = append(pnames, pname)
+		}
+	}
+	sort.Strings(pnames)
+
+	// Print Outputs
+	for _, pname := range pnames {
+		creator := processors.Processors[pname]
+		output := creator()
+		printConfig(pname, output, "processors", commented)
+	}
+}
+
+func printFilteredAggregators(aggregatorFilters []string, commented bool) {
+	// Filter outputs
+	var anames []string
+	for aname := range aggregators.Aggregators {
+		if sliceContains(aname, aggregatorFilters) {
+			anames = append(anames, aname)
+		}
+	}
+	sort.Strings(anames)
+
+	// Print Outputs
+	for _, aname := range anames {
+		creator := aggregators.Aggregators[aname]
+		output := creator()
+		printConfig(aname, output, "aggregators", commented)
+	}
+}
+
 func printFilteredInputs(inputFilters []string, commented bool) {
 	// Filter inputs
 	var pnames []string
@@ -507,6 +583,7 @@ func (c *Config) LoadConfig(path string) error {
 		case "outputs":
 			for pluginName, pluginVal := range subTable.Fields {
 				switch pluginSubTable := pluginVal.(type) {
+				// legacy [outputs.influxdb] support
 				case *ast.Table:
 					if err = c.addOutput(pluginName, pluginSubTable); err != nil {
 						return fmt.Errorf("Error parsing %s, %s", path, err)
@@ -525,6 +602,7 @@ func (c *Config) LoadConfig(path string) error {
 		case "inputs", "plugins":
 			for pluginName, pluginVal := range subTable.Fields {
 				switch pluginSubTable := pluginVal.(type) {
+				// legacy [inputs.cpu] support
 				case *ast.Table:
 					if err = c.addInput(pluginName, pluginSubTable); err != nil {
 						return fmt.Errorf("Error parsing %s, %s", path, err)
@@ -540,6 +618,34 @@ func (c *Config) LoadConfig(path string) error {
 						pluginName, path)
 				}
 			}
+		case "processors":
+			for pluginName, pluginVal := range subTable.Fields {
+				switch pluginSubTable := pluginVal.(type) {
+				case []*ast.Table:
+					for _, t := range pluginSubTable {
+						if err = c.addProcessor(pluginName, t); err != nil {
+							return fmt.Errorf("Error parsing %s, %s", path, err)
+						}
+					}
+				default:
+					return fmt.Errorf("Unsupported config format: %s, file %s",
+						pluginName, path)
+				}
+			}
+		case "aggregators":
+			for pluginName, pluginVal := range subTable.Fields {
+				switch pluginSubTable := pluginVal.(type) {
+				case []*ast.Table:
+					for _, t := range pluginSubTable {
+						if err = c.addAggregator(pluginName, t); err != nil {
+							return fmt.Errorf("Error parsing %s, %s", path, err)
+						}
+					}
+				default:
+					return fmt.Errorf("Unsupported config format: %s, file %s",
+						pluginName, path)
+				}
+			}
 		// Assume it's an input input for legacy config file support if no other
 		// identifiers are present
 		default:
@@ -580,6 +686,57 @@ func parseFile(fpath string) (*ast.Table, error) {
 	return toml.Parse(contents)
 }
 
+func (c *Config) addAggregator(name string, table *ast.Table) error {
+	creator, ok := aggregators.Aggregators[name]
+	if !ok {
+		return fmt.Errorf("Undefined but requested aggregator: %s", name)
+	}
+	aggregator := creator()
+
+	aggregatorConfig, err := buildAggregator(name, table)
+	if err != nil {
+		return err
+	}
+
+	if err := config.UnmarshalTable(table, aggregator); err != nil {
+		return err
+	}
+
+	rf := &models.RunningAggregator{
+		Aggregator: aggregator,
+		Config:     aggregatorConfig,
+	}
+
+	c.Aggregators = append(c.Aggregators, rf)
+	return nil
+}
+
+func (c *Config) addProcessor(name string, table *ast.Table) error {
+	creator, ok := processors.Processors[name]
+	if !ok {
+		return fmt.Errorf("Undefined but requested processor: %s", name)
+	}
+	processor := creator()
+
+	processorConfig, err := buildProcessor(name, table)
+	if err != nil {
+		return err
+	}
+
+	if err := config.UnmarshalTable(table, processor); err != nil {
+		return err
+	}
+
+	rf := &models.RunningProcessor{
+		Name:      name,
+		Processor: processor,
+		Config:    processorConfig,
+	}
+
+	c.Processors = append(c.Processors, rf)
+	return nil
+}
+
 func (c *Config) addOutput(name string, table *ast.Table) error {
 	if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
 		return nil
@@ -652,7 +809,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
 	}
 
 	rp := &models.RunningInput{
-		Name:   name,
 		Input:  input,
 		Config: pluginConfig,
 	}
@@ -660,6 +816,93 @@ func (c *Config) addInput(name string, table *ast.Table) error {
 	return nil
 }
 
+// buildAggregator TODO doc
+func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) {
+	conf := &models.AggregatorConfig{Name: name}
+	unsupportedFields := []string{"tagexclude", "taginclude"}
+	for _, field := range unsupportedFields {
+		if _, ok := tbl.Fields[field]; ok {
+			// TODO raise error because field is not supported
+		}
+	}
+
+	if node, ok := tbl.Fields["drop_original"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if b, ok := kv.Value.(*ast.Boolean); ok {
+				var err error
+				conf.DropOriginal, err = strconv.ParseBool(b.Value)
+				if err != nil {
+					log.Printf("Error parsing boolean value for %s: %s\n", name, err)
+				}
+			}
+		}
+	}
+
+	if node, ok := tbl.Fields["name_prefix"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if str, ok := kv.Value.(*ast.String); ok {
+				conf.MeasurementPrefix = str.Value
+			}
+		}
+	}
+
+	if node, ok := tbl.Fields["name_suffix"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if str, ok := kv.Value.(*ast.String); ok {
+				conf.MeasurementSuffix = str.Value
+			}
+		}
+	}
+
+	if node, ok := tbl.Fields["name_override"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if str, ok := kv.Value.(*ast.String); ok {
+				conf.NameOverride = str.Value
+			}
+		}
+	}
+
+	conf.Tags = make(map[string]string)
+	if node, ok := tbl.Fields["tags"]; ok {
+		if subtbl, ok := node.(*ast.Table); ok {
+			if err := config.UnmarshalTable(subtbl, conf.Tags); err != nil {
+				log.Printf("Could not parse tags for input %s\n", name)
+			}
+		}
+	}
+
+	delete(tbl.Fields, "drop_original")
+	delete(tbl.Fields, "name_prefix")
+	delete(tbl.Fields, "name_suffix")
+	delete(tbl.Fields, "name_override")
+	delete(tbl.Fields, "tags")
+	var err error
+	conf.Filter, err = buildFilter(tbl)
+	if err != nil {
+		return conf, err
+	}
+	return conf, nil
+}
+
+// buildProcessor TODO doc
+func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
+	conf := &models.ProcessorConfig{Name: name}
+	unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop",
+		"tagexclude", "taginclude"}
+	for _, field := range unsupportedFields {
+		if _, ok := tbl.Fields[field]; ok {
+			// TODO raise error because field is not supported
+		}
+	}
+
+	var err error
+	conf.Filter, err = buildFilter(tbl)
+	if err != nil {
+		return conf, err
+	}
+	return conf, nil
+}
+
 // buildFilter builds a Filter
 // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
 // be inserted into the models.OutputConfig/models.InputConfig
diff --git a/internal/models/filter.go b/internal/models/filter.go
index b87c5950..8e080f4a 100644
--- a/internal/models/filter.go
+++ b/internal/models/filter.go
@@ -96,7 +96,7 @@ func (f *Filter) Compile() error {
 // Apply applies the filter to the given measurement name, fields map, and
 // tags map. It will return false if the metric should be "filtered out", and
 // true if the metric should "pass".
-// It will modify tags in-place if they need to be deleted.
+// It will modify tags & fields in-place if they need to be deleted.
 func (f *Filter) Apply(
 	measurement string,
 	fields map[string]interface{},
diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go
new file mode 100644
index 00000000..b198109a
--- /dev/null
+++ b/internal/models/makemetric.go
@@ -0,0 +1,150 @@
+package models
+
+import (
+	"log"
+	"math"
+	"time"
+
+	"github.com/influxdata/telegraf"
+)
+
+// makemetric is used by both RunningAggregator & RunningInput
+// to make metrics.
+//   nameOverride: override the name of the measurement being made.
+//   namePrefix:   add this prefix to each measurement name.
+//   nameSuffix:   add this suffix to each measurement name.
+//   pluginTags:   these are tags that are specific to this plugin.
+//   daemonTags:   these are daemon-wide global tags, and get applied after pluginTags.
+//   filter:       this is a filter to apply to each metric being made.
+//   applyFilter:  if false, the above filter is not applied to each metric.
+//                 This is used by Aggregators, because aggregators use filters
+//                 on incoming metrics instead of on created metrics.
+func makemetric(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	nameOverride string,
+	namePrefix string,
+	nameSuffix string,
+	pluginTags map[string]string,
+	daemonTags map[string]string,
+	filter Filter,
+	applyFilter bool,
+	debug bool,
+	mType telegraf.ValueType,
+	t time.Time,
+) telegraf.Metric {
+	if len(fields) == 0 || len(measurement) == 0 {
+		return nil
+	}
+	if tags == nil {
+		tags = make(map[string]string)
+	}
+
+	// Override measurement name if set
+	if len(nameOverride) != 0 {
+		measurement = nameOverride
+	}
+	// Apply measurement prefix and suffix if set
+	if len(namePrefix) != 0 {
+		measurement = namePrefix + measurement
+	}
+	if len(nameSuffix) != 0 {
+		measurement = measurement + nameSuffix
+	}
+
+	// Apply plugin-wide tags if set
+	for k, v := range pluginTags {
+		if _, ok := tags[k]; !ok {
+			tags[k] = v
+		}
+	}
+	// Apply daemon-wide tags if set
+	for k, v := range daemonTags {
+		if _, ok := tags[k]; !ok {
+			tags[k] = v
+		}
+	}
+
+	// Apply the metric filter(s)
+	// for aggregators, the filter does not get applied when the metric is made.
+	// instead, the filter is applied to metric incoming into the plugin.
+	//   ie, it gets applied in the RunningAggregator.Apply function.
+	if applyFilter {
+		if ok := filter.Apply(measurement, fields, tags); !ok {
+			return nil
+		}
+	}
+
+	for k, v := range fields {
+		// Validate uint64 and float64 fields
+		// convert all int & uint types to int64
+		switch val := v.(type) {
+		case uint:
+			fields[k] = int64(val)
+			continue
+		case uint8:
+			fields[k] = int64(val)
+			continue
+		case uint16:
+			fields[k] = int64(val)
+			continue
+		case uint32:
+			fields[k] = int64(val)
+			continue
+		case int:
+			fields[k] = int64(val)
+			continue
+		case int8:
+			fields[k] = int64(val)
+			continue
+		case int16:
+			fields[k] = int64(val)
+			continue
+		case int32:
+			fields[k] = int64(val)
+			continue
+		case uint64:
+			// InfluxDB does not support writing uint64
+			if val < uint64(9223372036854775808) {
+				fields[k] = int64(val)
+			} else {
+				fields[k] = int64(9223372036854775807)
+			}
+			continue
+		case float32:
+			fields[k] = float64(val)
+			continue
+		case float64:
+			// NaNs are invalid values in influxdb, skip measurement
+			if math.IsNaN(val) || math.IsInf(val, 0) {
+				if debug {
+					log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
+						"field, skipping",
+						measurement, k)
+				}
+				delete(fields, k)
+				continue
+			}
+		}
+
+		fields[k] = v
+	}
+
+	var m telegraf.Metric
+	var err error
+	switch mType {
+	case telegraf.Counter:
+		m, err = telegraf.NewCounterMetric(measurement, tags, fields, t)
+	case telegraf.Gauge:
+		m, err = telegraf.NewGaugeMetric(measurement, tags, fields, t)
+	default:
+		m, err = telegraf.NewMetric(measurement, tags, fields, t)
+	}
+	if err != nil {
+		log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
+		return nil
+	}
+
+	return m
+}
diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go
new file mode 100644
index 00000000..d344565f
--- /dev/null
+++ b/internal/models/running_aggregator.go
@@ -0,0 +1,79 @@
+package models
+
+import (
+	"time"
+
+	"github.com/influxdata/telegraf"
+)
+
+type RunningAggregator struct {
+	Aggregator telegraf.Aggregator
+	Config     *AggregatorConfig
+}
+
+// AggregatorConfig containing configuration parameters for the running
+// aggregator plugin.
+type AggregatorConfig struct {
+	Name string
+
+	DropOriginal      bool
+	NameOverride      string
+	MeasurementPrefix string
+	MeasurementSuffix string
+	Tags              map[string]string
+	Filter            Filter
+}
+
+func (r *RunningAggregator) Name() string {
+	return "aggregators." + r.Config.Name
+}
+
+func (r *RunningAggregator) MakeMetric(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	mType telegraf.ValueType,
+	t time.Time,
+) telegraf.Metric {
+	m := makemetric(
+		measurement,
+		fields,
+		tags,
+		r.Config.NameOverride,
+		r.Config.MeasurementPrefix,
+		r.Config.MeasurementSuffix,
+		r.Config.Tags,
+		nil,
+		r.Config.Filter,
+		false,
+		false,
+		mType,
+		t,
+	)
+
+	m.SetAggregate(true)
+
+	return m
+}
+
+// Apply applies the given metric to the aggregator.
+// Before applying to the plugin, it will run any defined filters on the metric.
+// Apply returns true if the original metric should be dropped.
+func (r *RunningAggregator) Apply(in telegraf.Metric) bool {
+	if r.Config.Filter.IsActive() {
+		// check if the aggregator should apply this metric
+		name := in.Name()
+		fields := in.Fields()
+		tags := in.Tags()
+		t := in.Time()
+		if ok := r.Config.Filter.Apply(name, fields, tags); !ok {
+			// aggregator should not apply this metric
+			return false
+		}
+
+		in, _ = telegraf.NewMetric(name, tags, fields, t)
+	}
+
+	r.Aggregator.Apply(in)
+	return r.Config.DropOriginal
+}
diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go
new file mode 100644
index 00000000..d2f04e20
--- /dev/null
+++ b/internal/models/running_aggregator_test.go
@@ -0,0 +1,150 @@
+package models
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestApply(t *testing.T) {
+	a := &TestAggregator{}
+	ra := RunningAggregator{
+		Config: &AggregatorConfig{
+			Name: "TestRunningAggregator",
+			Filter: Filter{
+				NamePass: []string{"*"},
+			},
+		},
+		Aggregator: a,
+	}
+	assert.NoError(t, ra.Config.Filter.Compile())
+
+	m := ra.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		time.Now(),
+	)
+	assert.False(t, ra.Apply(m))
+	assert.Equal(t, int64(101), a.sum)
+}
+
+func TestApplyDropOriginal(t *testing.T) {
+	ra := RunningAggregator{
+		Config: &AggregatorConfig{
+			Name: "TestRunningAggregator",
+			Filter: Filter{
+				NamePass: []string{"RI*"},
+			},
+			DropOriginal: true,
+		},
+		Aggregator: &TestAggregator{},
+	}
+	assert.NoError(t, ra.Config.Filter.Compile())
+
+	m := ra.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		time.Now(),
+	)
+	assert.True(t, ra.Apply(m))
+
+	// this metric name doesn't match the filter, so Apply will return false
+	m2 := ra.MakeMetric(
+		"foobar",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		time.Now(),
+	)
+	assert.False(t, ra.Apply(m2))
+}
+
+// make an untyped, counter, & gauge metric
+func TestMakeMetricA(t *testing.T) {
+	now := time.Now()
+	ra := RunningAggregator{
+		Config: &AggregatorConfig{
+			Name: "TestRunningAggregator",
+		},
+	}
+	assert.Equal(t, "aggregators.TestRunningAggregator", ra.Name())
+
+	m := ra.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
+	)
+	assert.Equal(
+		t,
+		m.Type(),
+		telegraf.Untyped,
+	)
+
+	m = ra.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Counter,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
+	)
+	assert.Equal(
+		t,
+		m.Type(),
+		telegraf.Counter,
+	)
+
+	m = ra.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Gauge,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
+	)
+	assert.Equal(
+		t,
+		m.Type(),
+		telegraf.Gauge,
+	)
+}
+
+type TestAggregator struct {
+	sum int64
+}
+
+func (t *TestAggregator) Description() string                  { return "" }
+func (t *TestAggregator) SampleConfig() string                 { return "" }
+func (t *TestAggregator) Start(acc telegraf.Accumulator) error { return nil }
+func (t *TestAggregator) Stop()                                {}
+
+func (t *TestAggregator) Apply(in telegraf.Metric) {
+	for _, v := range in.Fields() {
+		if vi, ok := v.(int64); ok {
+			t.sum += vi
+		}
+	}
+}
diff --git a/internal/models/running_input.go b/internal/models/running_input.go
index 445c5ee9..558af3e5 100644
--- a/internal/models/running_input.go
+++ b/internal/models/running_input.go
@@ -1,15 +1,19 @@
 package models
 
 import (
+	"fmt"
 	"time"
 
 	"github.com/influxdata/telegraf"
 )
 
 type RunningInput struct {
-	Name   string
 	Input  telegraf.Input
 	Config *InputConfig
+
+	trace       bool
+	debug       bool
+	defaultTags map[string]string
 }
 
 // InputConfig containing a name, interval, and filter
@@ -22,3 +26,59 @@ type InputConfig struct {
 	Filter            Filter
 	Interval          time.Duration
 }
+
+func (r *RunningInput) Name() string {
+	return "inputs." + r.Config.Name
+}
+
+// MakeMetric either returns a metric, or returns nil if the metric doesn't
+// need to be created (because of filtering, an error, etc.)
+func (r *RunningInput) MakeMetric(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	mType telegraf.ValueType,
+	t time.Time,
+) telegraf.Metric {
+	m := makemetric(
+		measurement,
+		fields,
+		tags,
+		r.Config.NameOverride,
+		r.Config.MeasurementPrefix,
+		r.Config.MeasurementSuffix,
+		r.Config.Tags,
+		r.defaultTags,
+		r.Config.Filter,
+		true,
+		r.debug,
+		mType,
+		t,
+	)
+
+	if r.trace && m != nil {
+		fmt.Println("> " + m.String())
+	}
+
+	return m
+}
+
+func (r *RunningInput) Debug() bool {
+	return r.debug
+}
+
+func (r *RunningInput) SetDebug(debug bool) {
+	r.debug = debug
+}
+
+func (r *RunningInput) Trace() bool {
+	return r.trace
+}
+
+func (r *RunningInput) SetTrace(trace bool) {
+	r.trace = trace
+}
+
+func (r *RunningInput) SetDefaultTags(tags map[string]string) {
+	r.defaultTags = tags
+}
diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go
new file mode 100644
index 00000000..12283057
--- /dev/null
+++ b/internal/models/running_input_test.go
@@ -0,0 +1,326 @@
+package models
+
+import (
+	"fmt"
+	"math"
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestMakeMetricNoFields(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name: "TestRunningInput",
+		},
+	}
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Nil(t, m)
+}
+
+// make an untyped, counter, & gauge metric
+func TestMakeMetric(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name: "TestRunningInput",
+		},
+	}
+	ri.SetDebug(true)
+	assert.Equal(t, true, ri.Debug())
+	ri.SetTrace(true)
+	assert.Equal(t, true, ri.Trace())
+	assert.Equal(t, "inputs.TestRunningInput", ri.Name())
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
+	)
+	assert.Equal(
+		t,
+		m.Type(),
+		telegraf.Untyped,
+	)
+
+	m = ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Counter,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
+	)
+	assert.Equal(
+		t,
+		m.Type(),
+		telegraf.Counter,
+	)
+
+	m = ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Gauge,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
+	)
+	assert.Equal(
+		t,
+		m.Type(),
+		telegraf.Gauge,
+	)
+}
+
+func TestMakeMetricWithPluginTags(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name: "TestRunningInput",
+			Tags: map[string]string{
+				"foo": "bar",
+			},
+		},
+	}
+	ri.SetDebug(true)
+	assert.Equal(t, true, ri.Debug())
+	ri.SetTrace(true)
+	assert.Equal(t, true, ri.Trace())
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		nil,
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest,foo=bar value=101i %d", now.UnixNano()),
+	)
+}
+
+func TestMakeMetricFilteredOut(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name: "TestRunningInput",
+			Tags: map[string]string{
+				"foo": "bar",
+			},
+			Filter: Filter{NamePass: []string{"foobar"}},
+		},
+	}
+	ri.SetDebug(true)
+	assert.Equal(t, true, ri.Debug())
+	ri.SetTrace(true)
+	assert.Equal(t, true, ri.Trace())
+	assert.NoError(t, ri.Config.Filter.Compile())
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		nil,
+		telegraf.Untyped,
+		now,
+	)
+	assert.Nil(t, m)
+}
+
+func TestMakeMetricWithDaemonTags(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name: "TestRunningInput",
+		},
+	}
+	ri.SetDefaultTags(map[string]string{
+		"foo": "bar",
+	})
+	ri.SetDebug(true)
+	assert.Equal(t, true, ri.Debug())
+	ri.SetTrace(true)
+	assert.Equal(t, true, ri.Trace())
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest,foo=bar value=101i %d", now.UnixNano()),
+	)
+}
+
+// make an untyped, counter, & gauge metric
+func TestMakeMetricInfFields(t *testing.T) {
+	inf := math.Inf(1)
+	ninf := math.Inf(-1)
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name: "TestRunningInput",
+		},
+	}
+	ri.SetDebug(true)
+	assert.Equal(t, true, ri.Debug())
+	ri.SetTrace(true)
+	assert.Equal(t, true, ri.Trace())
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{
+			"value": int(101),
+			"inf":   inf,
+			"ninf":  ninf,
+		},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
+	)
+}
+
+func TestMakeMetricAllFieldTypes(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name: "TestRunningInput",
+		},
+	}
+	ri.SetDebug(true)
+	assert.Equal(t, true, ri.Debug())
+	ri.SetTrace(true)
+	assert.Equal(t, true, ri.Trace())
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{
+			"a": int(10),
+			"b": int8(10),
+			"c": int16(10),
+			"d": int32(10),
+			"e": uint(10),
+			"f": uint8(10),
+			"g": uint16(10),
+			"h": uint32(10),
+			"i": uint64(10),
+			"j": float32(10),
+			"k": uint64(9223372036854775810),
+			"l": "foobar",
+			"m": true,
+		},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		fmt.Sprintf("RITest a=10i,b=10i,c=10i,d=10i,e=10i,f=10i,g=10i,h=10i,i=10i,j=10,k=9223372036854775807i,l=\"foobar\",m=true %d", now.UnixNano()),
+		m.String(),
+	)
+}
+
+func TestMakeMetricNameOverride(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name:         "TestRunningInput",
+			NameOverride: "foobar",
+		},
+	}
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("foobar value=101i %d", now.UnixNano()),
+	)
+}
+
+func TestMakeMetricNamePrefix(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name:              "TestRunningInput",
+			MeasurementPrefix: "foobar_",
+		},
+	}
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("foobar_RITest value=101i %d", now.UnixNano()),
+	)
+}
+
+func TestMakeMetricNameSuffix(t *testing.T) {
+	now := time.Now()
+	ri := RunningInput{
+		Config: &InputConfig{
+			Name:              "TestRunningInput",
+			MeasurementSuffix: "_foobar",
+		},
+	}
+
+	m := ri.MakeMetric(
+		"RITest",
+		map[string]interface{}{"value": int(101)},
+		map[string]string{},
+		telegraf.Untyped,
+		now,
+	)
+	assert.Equal(
+		t,
+		m.String(),
+		fmt.Sprintf("RITest_foobar value=101i %d", now.UnixNano()),
+	)
+}
diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go
index a42d6fc7..2bca79a0 100644
--- a/internal/models/running_output_test.go
+++ b/internal/models/running_output_test.go
@@ -132,7 +132,6 @@ func TestRunningOutput_PassFilter(t *testing.T) {
 func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
-
 			TagInclude: []string{"nothing*"},
 		},
 	}
@@ -154,7 +153,6 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
 func TestRunningOutput_TagExcludeMatch(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
-
 			TagExclude: []string{"tag*"},
 		},
 	}
@@ -176,7 +174,6 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) {
 func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
-
 			TagExclude: []string{"nothing*"},
 		},
 	}
@@ -198,7 +195,6 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
 func TestRunningOutput_TagIncludeMatch(t *testing.T) {
 	conf := &OutputConfig{
 		Filter: Filter{
-
 			TagInclude: []string{"tag*"},
 		},
 	}
diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go
new file mode 100644
index 00000000..f4f483f6
--- /dev/null
+++ b/internal/models/running_processor.go
@@ -0,0 +1,37 @@
+package models
+
+import (
+	"github.com/influxdata/telegraf"
+)
+
+type RunningProcessor struct {
+	Name      string
+	Processor telegraf.Processor
+	Config    *ProcessorConfig
+}
+
+// FilterConfig containing a name and filter
+type ProcessorConfig struct {
+	Name   string
+	Filter Filter
+}
+
+func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
+	ret := []telegraf.Metric{}
+
+	for _, metric := range in {
+		if rp.Config.Filter.IsActive() {
+			// check if the filter should be applied to this metric
+			if ok := rp.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok {
+				// this means filter should not be applied
+				ret = append(ret, metric)
+				continue
+			}
+		}
+		// This metric should pass through the filter, so call the filter Apply
+		// function and append results to the output slice.
+		ret = append(ret, rp.Processor.Apply(metric)...)
+	}
+
+	return ret
+}
diff --git a/internal/models/running_processor_test.go b/internal/models/running_processor_test.go
new file mode 100644
index 00000000..8a691a9b
--- /dev/null
+++ b/internal/models/running_processor_test.go
@@ -0,0 +1,117 @@
+package models
+
+import (
+	"testing"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/testutil"
+
+	"github.com/stretchr/testify/assert"
+)
+
+type TestProcessor struct {
+}
+
+func (f *TestProcessor) SampleConfig() string { return "" }
+func (f *TestProcessor) Description() string  { return "" }
+
+// Apply renames:
+//   "foo" to "fuz"
+//   "bar" to "baz"
+// And it also drops measurements named "dropme"
+func (f *TestProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
+	out := make([]telegraf.Metric, 0)
+	for _, m := range in {
+		switch m.Name() {
+		case "foo":
+			out = append(out, testutil.TestMetric(1, "fuz"))
+		case "bar":
+			out = append(out, testutil.TestMetric(1, "baz"))
+		case "dropme":
+			// drop the metric!
+		default:
+			out = append(out, m)
+		}
+	}
+	return out
+}
+
+func NewTestRunningProcessor() *RunningProcessor {
+	out := &RunningProcessor{
+		Name:      "test",
+		Processor: &TestProcessor{},
+		Config:    &ProcessorConfig{Filter: Filter{}},
+	}
+	return out
+}
+
+func TestRunningProcessor(t *testing.T) {
+	inmetrics := []telegraf.Metric{
+		testutil.TestMetric(1, "foo"),
+		testutil.TestMetric(1, "bar"),
+		testutil.TestMetric(1, "baz"),
+	}
+
+	expectedNames := []string{
+		"fuz",
+		"baz",
+		"baz",
+	}
+	rfp := NewTestRunningProcessor()
+	filteredMetrics := rfp.Apply(inmetrics...)
+
+	actualNames := []string{
+		filteredMetrics[0].Name(),
+		filteredMetrics[1].Name(),
+		filteredMetrics[2].Name(),
+	}
+	assert.Equal(t, expectedNames, actualNames)
+}
+
+func TestRunningProcessor_WithNameDrop(t *testing.T) {
+	inmetrics := []telegraf.Metric{
+		testutil.TestMetric(1, "foo"),
+		testutil.TestMetric(1, "bar"),
+		testutil.TestMetric(1, "baz"),
+	}
+
+	expectedNames := []string{
+		"foo",
+		"baz",
+		"baz",
+	}
+	rfp := NewTestRunningProcessor()
+
+	rfp.Config.Filter.NameDrop = []string{"foo"}
+	assert.NoError(t, rfp.Config.Filter.Compile())
+
+	filteredMetrics := rfp.Apply(inmetrics...)
+
+	actualNames := []string{
+		filteredMetrics[0].Name(),
+		filteredMetrics[1].Name(),
+		filteredMetrics[2].Name(),
+	}
+	assert.Equal(t, expectedNames, actualNames)
+}
+
+func TestRunningProcessor_DroppedMetric(t *testing.T) {
+	inmetrics := []telegraf.Metric{
+		testutil.TestMetric(1, "dropme"),
+		testutil.TestMetric(1, "foo"),
+		testutil.TestMetric(1, "bar"),
+	}
+
+	expectedNames := []string{
+		"fuz",
+		"baz",
+	}
+	rfp := NewTestRunningProcessor()
+	filteredMetrics := rfp.Apply(inmetrics...)
+
+	actualNames := []string{
+		filteredMetrics[0].Name(),
+		filteredMetrics[1].Name(),
+	}
+	assert.Equal(t, expectedNames, actualNames)
+}
diff --git a/metric.go b/metric.go
index 937603cd..9209de73 100644
--- a/metric.go
+++ b/metric.go
@@ -4,6 +4,7 @@ import (
 	"time"
 
 	"github.com/influxdata/influxdb/client/v2"
+	"github.com/influxdata/influxdb/models"
 )
 
 // ValueType is an enumeration of metric types that represent a simple value.
@@ -33,6 +34,9 @@ type Metric interface {
 	// UnixNano returns the unix nano time of the metric
 	UnixNano() int64
 
+	// HashID returns a non-cryptographic hash of the metric (name + tags)
+	HashID() uint64
+
 	// Fields returns the fields for the metric
 	Fields() map[string]interface{}
 
@@ -44,13 +48,21 @@ type Metric interface {
 
 	// Point returns a influxdb client.Point object
 	Point() *client.Point
+
+	// SetAggregate sets the metric's aggregate status
+	// This is so that aggregate metrics don't get re-sent to aggregator plugins
+	SetAggregate(bool)
+	// IsAggregate returns true if the metric is an aggregate
+	IsAggregate() bool
 }
 
 // metric is a wrapper of the influxdb client.Point struct
 type metric struct {
-	pt *client.Point
+	pt models.Point
 
 	mType ValueType
+
+	isaggregate bool
 }
 
 // NewMetric returns an untyped metric.
@@ -60,7 +72,7 @@ func NewMetric(
 	fields map[string]interface{},
 	t time.Time,
 ) (Metric, error) {
-	pt, err := client.NewPoint(name, tags, fields, t)
+	pt, err := models.NewPoint(name, tags, fields, t)
 	if err != nil {
 		return nil, err
 	}
@@ -79,7 +91,7 @@ func NewGaugeMetric(
 	fields map[string]interface{},
 	t time.Time,
 ) (Metric, error) {
-	pt, err := client.NewPoint(name, tags, fields, t)
+	pt, err := models.NewPoint(name, tags, fields, t)
 	if err != nil {
 		return nil, err
 	}
@@ -98,7 +110,7 @@ func NewCounterMetric(
 	fields map[string]interface{},
 	t time.Time,
 ) (Metric, error) {
-	pt, err := client.NewPoint(name, tags, fields, t)
+	pt, err := models.NewPoint(name, tags, fields, t)
 	if err != nil {
 		return nil, err
 	}
@@ -124,6 +136,10 @@ func (m *metric) Type() ValueType {
 	return m.mType
 }
 
+func (m *metric) HashID() uint64 {
+	return m.pt.HashID()
+}
+
 func (m *metric) UnixNano() int64 {
 	return m.pt.UnixNano()
 }
@@ -141,5 +157,13 @@ func (m *metric) PrecisionString(precison string) string {
 }
 
 func (m *metric) Point() *client.Point {
-	return m.pt
+	return client.NewPointFrom(m.pt)
+}
+
+func (m *metric) IsAggregate() bool {
+	return m.isaggregate
+}
+
+func (m *metric) SetAggregate(b bool) {
+	m.isaggregate = b
 }
diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go
new file mode 100644
index 00000000..1041a0c9
--- /dev/null
+++ b/plugins/aggregators/all/all.go
@@ -0,0 +1,5 @@
+package all
+
+import (
+	_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
+)
diff --git a/plugins/aggregators/minmax/minmax.go b/plugins/aggregators/minmax/minmax.go
new file mode 100644
index 00000000..e628ad7a
--- /dev/null
+++ b/plugins/aggregators/minmax/minmax.go
@@ -0,0 +1,192 @@
+package minmax
+
+import (
+	"sync"
+	"time"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/plugins/aggregators"
+)
+
+type MinMax struct {
+	Period internal.Duration
+
+	// metrics waiting to be processed
+	metrics  chan telegraf.Metric
+	shutdown chan struct{}
+	wg       sync.WaitGroup
+
+	// caches for metric fields, names, and tags
+	fieldCache map[uint64]map[string]minmax
+	nameCache  map[uint64]string
+	tagCache   map[uint64]map[string]string
+
+	acc telegraf.Accumulator
+}
+
+type minmax struct {
+	min interface{}
+	max interface{}
+}
+
+var sampleConfig = `
+  ## TODO doc
+  period = "30s"
+`
+
+func (m *MinMax) SampleConfig() string {
+	return sampleConfig
+}
+
+func (m *MinMax) Description() string {
+	return "Keep the aggregate min/max of each metric passing through."
+}
+
+func (m *MinMax) Apply(in telegraf.Metric) {
+	m.metrics <- in
+}
+
+func (m *MinMax) apply(in telegraf.Metric) {
+	id := in.HashID()
+	if _, ok := m.nameCache[id]; !ok {
+		// hit an uncached metric, create caches for first time:
+		m.nameCache[id] = in.Name()
+		m.tagCache[id] = in.Tags()
+		m.fieldCache[id] = make(map[string]minmax)
+		for k, v := range in.Fields() {
+			m.fieldCache[id][k] = minmax{
+				min: v,
+				max: v,
+			}
+		}
+	} else {
+		for k, v := range in.Fields() {
+			cmpmin := compare(m.fieldCache[id][k].min, v)
+			cmpmax := compare(m.fieldCache[id][k].max, v)
+			if cmpmin == 1 {
+				tmp := m.fieldCache[id][k]
+				tmp.min = v
+				m.fieldCache[id][k] = tmp
+			}
+			if cmpmax == -1 {
+				tmp := m.fieldCache[id][k]
+				tmp.max = v
+				m.fieldCache[id][k] = tmp
+			}
+		}
+	}
+}
+
+func (m *MinMax) Start(acc telegraf.Accumulator) error {
+	m.metrics = make(chan telegraf.Metric, 10)
+	m.shutdown = make(chan struct{})
+	m.clearCache()
+	m.acc = acc
+	m.wg.Add(1)
+	if m.Period.Duration > 0 {
+		go m.periodHandler()
+	} else {
+		go m.continuousHandler()
+	}
+	return nil
+}
+
+func (m *MinMax) Stop() {
+	close(m.shutdown)
+	m.wg.Wait()
+}
+
+func (m *MinMax) addfields(id uint64) {
+	fields := map[string]interface{}{}
+	for k, v := range m.fieldCache[id] {
+		fields[k+"_min"] = v.min
+		fields[k+"_max"] = v.max
+	}
+	m.acc.AddFields(m.nameCache[id], fields, m.tagCache[id])
+}
+
+func (m *MinMax) clearCache() {
+	m.fieldCache = make(map[uint64]map[string]minmax)
+	m.nameCache = make(map[uint64]string)
+	m.tagCache = make(map[uint64]map[string]string)
+}
+
+// periodHandler only adds the aggregate metrics on the configured Period.
+//   thus if telegraf's collection interval is 10s, and period is 30s, there
+//   will only be one aggregate sent every 3 metrics.
+func (m *MinMax) periodHandler() {
+	// TODO make this sleep less of a hack!
+	time.Sleep(time.Millisecond * 200)
+	defer m.wg.Done()
+	ticker := time.NewTicker(m.Period.Duration)
+	defer ticker.Stop()
+	for {
+		select {
+		case in := <-m.metrics:
+			m.apply(in)
+		case <-m.shutdown:
+			if len(m.metrics) > 0 {
+				continue
+			}
+			return
+		case <-ticker.C:
+			for id, _ := range m.nameCache {
+				m.addfields(id)
+			}
+			m.clearCache()
+		}
+	}
+}
+
+// continuousHandler sends one metric for every metric that passes through it.
+func (m *MinMax) continuousHandler() {
+	defer m.wg.Done()
+	for {
+		select {
+		case in := <-m.metrics:
+			m.apply(in)
+			m.addfields(in.HashID())
+		case <-m.shutdown:
+			if len(m.metrics) > 0 {
+				continue
+			}
+			return
+		}
+	}
+}
+
+func compare(a, b interface{}) int {
+	switch at := a.(type) {
+	case int64:
+		if bt, ok := b.(int64); ok {
+			if at < bt {
+				return -1
+			} else if at > bt {
+				return 1
+			}
+			return 0
+		} else {
+			return 0
+		}
+	case float64:
+		if bt, ok := b.(float64); ok {
+			if at < bt {
+				return -1
+			} else if at > bt {
+				return 1
+			}
+			return 0
+		} else {
+			return 0
+		}
+	default:
+		return 0
+	}
+}
+
+func init() {
+	aggregators.Add("minmax", func() telegraf.Aggregator {
+		return &MinMax{}
+	})
+}
diff --git a/plugins/aggregators/minmax/minmax_test.go b/plugins/aggregators/minmax/minmax_test.go
new file mode 100644
index 00000000..5a854d91
--- /dev/null
+++ b/plugins/aggregators/minmax/minmax_test.go
@@ -0,0 +1,51 @@
+package minmax
+
+import (
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf"
+)
+
+func BenchmarkApply(b *testing.B) {
+	minmax := MinMax{}
+	minmax.clearCache()
+
+	m1, _ := telegraf.NewMetric("m1",
+		map[string]string{"foo": "bar"},
+		map[string]interface{}{
+			"a": int64(1),
+			"b": int64(1),
+			"c": int64(1),
+			"d": int64(1),
+			"e": int64(1),
+			"f": float64(2),
+			"g": float64(2),
+			"h": float64(2),
+			"i": float64(2),
+			"j": float64(3),
+		},
+		time.Now(),
+	)
+	m2, _ := telegraf.NewMetric("m1",
+		map[string]string{"foo": "bar"},
+		map[string]interface{}{
+			"a": int64(3),
+			"b": int64(3),
+			"c": int64(3),
+			"d": int64(3),
+			"e": int64(3),
+			"f": float64(1),
+			"g": float64(1),
+			"h": float64(1),
+			"i": float64(1),
+			"j": float64(1),
+		},
+		time.Now(),
+	)
+
+	for n := 0; n < b.N; n++ {
+		minmax.apply(m1)
+		minmax.apply(m2)
+	}
+}
diff --git a/plugins/aggregators/registry.go b/plugins/aggregators/registry.go
new file mode 100644
index 00000000..77a9c9a6
--- /dev/null
+++ b/plugins/aggregators/registry.go
@@ -0,0 +1,11 @@
+package aggregators
+
+import "github.com/influxdata/telegraf"
+
+type Creator func() telegraf.Aggregator
+
+var Aggregators = map[string]Creator{}
+
+func Add(name string, creator Creator) {
+	Aggregators[name] = creator
+}
diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go
index ed04cf86..267ba56a 100644
--- a/plugins/inputs/http_listener/http_listener_test.go
+++ b/plugins/inputs/http_listener/http_listener_test.go
@@ -99,14 +99,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
 	var wg sync.WaitGroup
 	for i := 0; i < 10; i++ {
 		wg.Add(1)
-		go func() {
+		go func(innerwg *sync.WaitGroup) {
+			defer innerwg.Done()
 			for i := 0; i < 500; i++ {
 				resp, err := http.Post("http://localhost:8286/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs)))
 				require.NoError(t, err)
 				require.EqualValues(t, 204, resp.StatusCode)
 			}
-			wg.Done()
-		}()
+		}(&wg)
 	}
 
 	wg.Wait()
diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go
index 199262c0..beb010fc 100644
--- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go
+++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go
@@ -29,6 +29,7 @@ type Postgresql struct {
 		Tagvalue    string
 		Measurement string
 	}
+	Debug bool
 }
 
 type query []struct {
diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go
new file mode 100644
index 00000000..462298f6
--- /dev/null
+++ b/plugins/processors/all/all.go
@@ -0,0 +1,5 @@
+package all
+
+import (
+	_ "github.com/influxdata/telegraf/plugins/processors/printer"
+)
diff --git a/plugins/processors/printer/printer.go b/plugins/processors/printer/printer.go
new file mode 100644
index 00000000..a65a104e
--- /dev/null
+++ b/plugins/processors/printer/printer.go
@@ -0,0 +1,35 @@
+package printer
+
+import (
+	"fmt"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/plugins/processors"
+)
+
+type Printer struct {
+}
+
+var sampleConfig = `
+`
+
+func (p *Printer) SampleConfig() string {
+	return sampleConfig
+}
+
+func (p *Printer) Description() string {
+	return "Print all metrics that pass through this filter."
+}
+
+func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
+	for _, metric := range in {
+		fmt.Println(metric.String())
+	}
+	return in
+}
+
+func init() {
+	processors.Add("printer", func() telegraf.Processor {
+		return &Printer{}
+	})
+}
diff --git a/plugins/processors/printer/printer_test.go b/plugins/processors/printer/printer_test.go
new file mode 100644
index 00000000..e69de29b
diff --git a/plugins/processors/registry.go b/plugins/processors/registry.go
new file mode 100644
index 00000000..592c688f
--- /dev/null
+++ b/plugins/processors/registry.go
@@ -0,0 +1,11 @@
+package processors
+
+import "github.com/influxdata/telegraf"
+
+type Creator func() telegraf.Processor
+
+var Processors = map[string]Creator{}
+
+func Add(name string, creator Creator) {
+	Processors[name] = creator
+}
diff --git a/processor.go b/processor.go
new file mode 100644
index 00000000..f2b5133a
--- /dev/null
+++ b/processor.go
@@ -0,0 +1,12 @@
+package telegraf
+
+type Processor interface {
+	// SampleConfig returns the default configuration of the Input
+	SampleConfig() string
+
+	// Description returns a one-sentence description on the Input
+	Description() string
+
+	// Apply the filter to the given metric
+	Apply(in ...Metric) []Metric
+}
-- 
GitLab