From 970bfce997d0bd7813e61cc327cc0b914d368925 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Mon, 16 Nov 2015 13:12:45 -0700
Subject: [PATCH] Fix kafka plugin and rename to kafka_consumer

fixes #371
---
 CHANGELOG.md                                  |   8 +
 README.md                                     |   2 +-
 config_test.go                                |  44 +-
 outputs/influxdb/influxdb.go                  |   5 +-
 plugins/kafka_consumer/kafka_consumer.go      | 222 +++++-----
 .../kafka_consumer_integration_test.go        |  84 ++--
 plugins/kafka_consumer/kafka_consumer_test.go | 125 +++---
 plugins/statsd/statsd.go                      |   3 +-
 testdata/single_plugin.toml                   |  11 +-
 .../subconfig/{kafka.conf => memcached.conf}  |   9 +-
 testdata/telegraf-agent.toml                  | 381 +++++++++---------
 11 files changed, 463 insertions(+), 431 deletions(-)
 rename testdata/subconfig/{kafka.conf => memcached.conf} (53%)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index a1eb3964..c1af2b60 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,12 +1,20 @@
 ## v0.2.3 [unreleased]
 
 ### Release Notes
+- **breaking change** The `kafka` plugin has been renamed to `kafka_consumer`.
+and most of the config option names have changed.
+This only affects the kafka consumer _plugin_ (not the
+output). There were a number of problems with the kafka plugin that led to it
+only collecting data once at startup, so the kafka plugin was basically non-
+functional.
 - Riemann output added
 
 ### Features
 - [#379](https://github.com/influxdb/telegraf/pull/379): Riemann output, thanks @allenj!
+- [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin.
 
 ### Bugfixes
+- [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning.
 
 ## v0.2.2 [2015-11-18]
 
diff --git a/README.md b/README.md
index 799a6d37..8fb139ae 100644
--- a/README.md
+++ b/README.md
@@ -164,7 +164,6 @@ Telegraf currently has support for collecting metrics from:
 * haproxy
 * httpjson (generic JSON-emitting http service plugin)
 * jolokia (remote JMX with JSON over HTTP)
-* kafka_consumer
 * leofs
 * lustre2
 * memcached
@@ -197,6 +196,7 @@ Telegraf currently has support for collecting metrics from:
 Telegraf can collect metrics via the following services:
 
 * statsd
+* kafka_consumer
 
 We'll be adding support for many more over the coming months. Read on if you
 want to add support for another service or third-party API.
diff --git a/config_test.go b/config_test.go
index 925bde3c..2801d22d 100644
--- a/config_test.go
+++ b/config_test.go
@@ -8,7 +8,7 @@ import (
 
 	"github.com/influxdb/telegraf/plugins"
 	"github.com/influxdb/telegraf/plugins/exec"
-	"github.com/influxdb/telegraf/plugins/kafka_consumer"
+	"github.com/influxdb/telegraf/plugins/memcached"
 	"github.com/influxdb/telegraf/plugins/procstat"
 	"github.com/naoina/toml"
 	"github.com/naoina/toml/ast"
@@ -205,17 +205,14 @@ func TestConfig_parsePlugin(t *testing.T) {
 		pluginConfigurationFieldsSet: make(map[string][]string),
 	}
 
-	subtbl := tbl.Fields["kafka"].(*ast.Table)
-	err = c.parsePlugin("kafka", subtbl)
+	subtbl := tbl.Fields["memcached"].(*ast.Table)
+	err = c.parsePlugin("memcached", subtbl)
 
-	kafka := plugins.Plugins["kafka"]().(*kafka_consumer.Kafka)
-	kafka.ConsumerGroupName = "telegraf_metrics_consumers"
-	kafka.Topic = "topic_with_metrics"
-	kafka.ZookeeperPeers = []string{"test.example.com:2181"}
-	kafka.BatchSize = 1000
+	memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
+	memcached.Servers = []string{"localhost"}
 
-	kConfig := &ConfiguredPlugin{
-		Name: "kafka",
+	mConfig := &ConfiguredPlugin{
+		Name: "memcached",
 		Drop: []string{"other", "stuff"},
 		Pass: []string{"some", "strings"},
 		TagDrop: []TagFilter{
@@ -233,10 +230,10 @@ func TestConfig_parsePlugin(t *testing.T) {
 		Interval: 5 * time.Second,
 	}
 
-	assert.Equal(t, kafka, c.plugins["kafka"],
-		"Testdata did not produce a correct kafka struct.")
-	assert.Equal(t, kConfig, c.pluginConfigurations["kafka"],
-		"Testdata did not produce correct kafka metadata.")
+	assert.Equal(t, memcached, c.plugins["memcached"],
+		"Testdata did not produce a correct memcached struct.")
+	assert.Equal(t, mConfig, c.pluginConfigurations["memcached"],
+		"Testdata did not produce correct memcached metadata.")
 }
 
 func TestConfig_LoadDirectory(t *testing.T) {
@@ -249,14 +246,11 @@ func TestConfig_LoadDirectory(t *testing.T) {
 		t.Error(err)
 	}
 
-	kafka := plugins.Plugins["kafka"]().(*kafka_consumer.Kafka)
-	kafka.ConsumerGroupName = "telegraf_metrics_consumers"
-	kafka.Topic = "topic_with_metrics"
-	kafka.ZookeeperPeers = []string{"test.example.com:2181"}
-	kafka.BatchSize = 10000
+	memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
+	memcached.Servers = []string{"192.168.1.1"}
 
-	kConfig := &ConfiguredPlugin{
-		Name: "kafka",
+	mConfig := &ConfiguredPlugin{
+		Name: "memcached",
 		Drop: []string{"other", "stuff"},
 		Pass: []string{"some", "strings"},
 		TagDrop: []TagFilter{
@@ -296,10 +290,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
 
 	pConfig := &ConfiguredPlugin{Name: "procstat"}
 
-	assert.Equal(t, kafka, c.plugins["kafka"],
-		"Merged Testdata did not produce a correct kafka struct.")
-	assert.Equal(t, kConfig, c.pluginConfigurations["kafka"],
-		"Merged Testdata did not produce correct kafka metadata.")
+	assert.Equal(t, memcached, c.plugins["memcached"],
+		"Merged Testdata did not produce a correct memcached struct.")
+	assert.Equal(t, mConfig, c.pluginConfigurations["memcached"],
+		"Merged Testdata did not produce correct memcached metadata.")
 
 	assert.Equal(t, ex, c.plugins["exec"],
 		"Merged Testdata did not produce a correct exec struct.")
diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go
index 7d3f95d1..a9fa2edc 100644
--- a/outputs/influxdb/influxdb.go
+++ b/outputs/influxdb/influxdb.go
@@ -29,8 +29,9 @@ type InfluxDB struct {
 }
 
 var sampleConfig = `
-  # The full HTTP or UDP endpoint URL for your InfluxDB instance
-  # Multiple urls can be specified for InfluxDB cluster support.
+  # The full HTTP or UDP endpoint URL for your InfluxDB instance.
+  # Multiple urls can be specified but it is assumed that they are part of the same
+  # cluster, this means that only ONE of the urls will be written to each interval.
   # urls = ["udp://localhost:8089"] # UDP endpoint example
   urls = ["http://localhost:8086"] # required
   # The target database for metrics (telegraf will create it if not exists)
diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go
index 1c0fbe26..f47e7e92 100644
--- a/plugins/kafka_consumer/kafka_consumer.go
+++ b/plugins/kafka_consumer/kafka_consumer.go
@@ -1,36 +1,51 @@
 package kafka_consumer
 
 import (
-	"os"
-	"os/signal"
-	"time"
+	"log"
+	"strings"
+	"sync"
 
-	"github.com/Shopify/sarama"
 	"github.com/influxdb/influxdb/models"
 	"github.com/influxdb/telegraf/plugins"
+
+	"github.com/Shopify/sarama"
 	"github.com/wvanbergen/kafka/consumergroup"
 )
 
 type Kafka struct {
-	ConsumerGroupName string
-	Topic             string
-	ZookeeperPeers    []string
-	Consumer          *consumergroup.ConsumerGroup
-	BatchSize         int
+	ConsumerGroup  string
+	Topics         []string
+	ZookeeperPeers []string
+	Consumer       *consumergroup.ConsumerGroup
+	PointBuffer    int
+	Offset         string
+
+	sync.Mutex
+
+	// channel for all incoming kafka messages
+	in <-chan *sarama.ConsumerMessage
+	// channel for all kafka consumer errors
+	errs <-chan *sarama.ConsumerError
+	// channel for all incoming parsed kafka points
+	pointChan chan models.Point
+	done      chan struct{}
+
+	// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
+	// this is mostly for test purposes, but there may be a use-case for it later.
+	doNotCommitMsgs bool
 }
 
 var sampleConfig = `
-  # topic to consume
-  topic = "topic_with_metrics"
-
-  # the name of the consumer group
-  consumerGroupName = "telegraf_metrics_consumers"
-
+  # topic(s) to consume
+  topics = ["telegraf"]
   # an array of Zookeeper connection strings
-  zookeeperPeers = ["localhost:2181"]
-
-  # Batch size of points sent to InfluxDB
-  batchSize = 1000
+  zookeeper_peers = ["localhost:2181"]
+  # the name of the consumer group
+  consumer_group = "telegraf_metrics_consumers"
+  # Maximum number of points to buffer between collection intervals
+  point_buffer = 100000
+  # Offset (must be either "oldest" or "newest")
+  offset = "oldest"
 `
 
 func (k *Kafka) SampleConfig() string {
@@ -38,127 +53,114 @@ func (k *Kafka) SampleConfig() string {
 }
 
 func (k *Kafka) Description() string {
-	return "read metrics from a Kafka topic"
-}
-
-type Metric struct {
-	Measurement string                 `json:"measurement"`
-	Values      map[string]interface{} `json:"values"`
-	Tags        map[string]string      `json:"tags"`
-	Time        time.Time              `json:"time"`
+	return "Read line-protocol metrics from Kafka topic(s)"
 }
 
-func (k *Kafka) Gather(acc plugins.Accumulator) error {
+func (k *Kafka) Start() error {
+	k.Lock()
+	defer k.Unlock()
 	var consumerErr error
-	metricQueue := make(chan []byte, 200)
 
-	if k.Consumer == nil {
+	config := consumergroup.NewConfig()
+	switch strings.ToLower(k.Offset) {
+	case "oldest", "":
+		config.Offsets.Initial = sarama.OffsetOldest
+	case "newest":
+		config.Offsets.Initial = sarama.OffsetNewest
+	default:
+		log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
+			k.Offset)
+		config.Offsets.Initial = sarama.OffsetOldest
+	}
+
+	if k.Consumer == nil || k.Consumer.Closed() {
 		k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
-			k.ConsumerGroupName,
-			[]string{k.Topic},
+			k.ConsumerGroup,
+			k.Topics,
 			k.ZookeeperPeers,
-			nil,
+			config,
 		)
-
 		if consumerErr != nil {
 			return consumerErr
 		}
 
-		c := make(chan os.Signal, 1)
-		halt := make(chan bool, 1)
-		signal.Notify(c, os.Interrupt)
-		go func() {
-			<-c
-			halt <- true
-			emitMetrics(k, acc, metricQueue)
-			k.Consumer.Close()
-		}()
-
-		go readFromKafka(k.Consumer.Messages(),
-			metricQueue,
-			k.BatchSize,
-			k.Consumer.CommitUpto,
-			halt)
+		// Setup message and error channels
+		k.in = k.Consumer.Messages()
+		k.errs = k.Consumer.Errors()
 	}
 
-	return emitMetrics(k, acc, metricQueue)
-}
+	k.done = make(chan struct{})
+	if k.PointBuffer == 0 {
+		k.PointBuffer = 100000
+	}
+	k.pointChan = make(chan models.Point, k.PointBuffer)
 
-func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error {
-	timeout := time.After(1 * time.Second)
+	// Start the kafka message reader
+	go k.parser()
+	log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
+		k.ZookeeperPeers, k.Topics)
+	return nil
+}
 
+// parser() reads all incoming messages from the consumer, and parses them into
+// influxdb metric points.
+func (k *Kafka) parser() {
 	for {
 		select {
-		case batch := <-metricConsumer:
-			var points []models.Point
-			var err error
-			if points, err = models.ParsePoints(batch); err != nil {
-				return err
+		case <-k.done:
+			return
+		case err := <-k.errs:
+			log.Printf("Kafka Consumer Error: %s\n", err.Error())
+		case msg := <-k.in:
+			points, err := models.ParsePoints(msg.Value)
+			if err != nil {
+				log.Printf("Could not parse kafka message: %s, error: %s",
+					string(msg.Value), err.Error())
 			}
 
 			for _, point := range points {
-				acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
+				select {
+				case k.pointChan <- point:
+					continue
+				default:
+					log.Printf("Kafka Consumer buffer is full, dropping a point." +
+						" You may want to increase the point_buffer setting")
+				}
+			}
+
+			if !k.doNotCommitMsgs {
+				// TODO(cam) this locking can be removed if this PR gets merged:
+				// https://github.com/wvanbergen/kafka/pull/84
+				k.Lock()
+				k.Consumer.CommitUpto(msg)
+				k.Unlock()
 			}
-		case <-timeout:
-			return nil
 		}
 	}
 }
 
-const millisecond = 1000000 * time.Nanosecond
-
-type ack func(*sarama.ConsumerMessage) error
-
-func readFromKafka(
-	kafkaMsgs <-chan *sarama.ConsumerMessage,
-	metricProducer chan<- []byte,
-	maxBatchSize int,
-	ackMsg ack,
-	halt <-chan bool,
-) {
-	batch := make([]byte, 0)
-	currentBatchSize := 0
-	timeout := time.After(500 * millisecond)
-	var msg *sarama.ConsumerMessage
-
-	for {
-		select {
-		case msg = <-kafkaMsgs:
-			if currentBatchSize != 0 {
-				batch = append(batch, '\n')
-			}
-
-			batch = append(batch, msg.Value...)
-			currentBatchSize++
-
-			if currentBatchSize == maxBatchSize {
-				metricProducer <- batch
-				currentBatchSize = 0
-				batch = make([]byte, 0)
-				ackMsg(msg)
-			}
-		case <-timeout:
-			if currentBatchSize != 0 {
-				metricProducer <- batch
-				currentBatchSize = 0
-				batch = make([]byte, 0)
-				ackMsg(msg)
-			}
-
-			timeout = time.After(500 * millisecond)
-		case <-halt:
-			if currentBatchSize != 0 {
-				metricProducer <- batch
-				ackMsg(msg)
-			}
+func (k *Kafka) Stop() {
+	k.Lock()
+	defer k.Unlock()
+	close(k.done)
+	if err := k.Consumer.Close(); err != nil {
+		log.Printf("Error closing kafka consumer: %s\n", err.Error())
+	}
+}
 
-			return
-		}
+func (k *Kafka) Gather(acc plugins.Accumulator) error {
+	k.Lock()
+	defer k.Unlock()
+	npoints := len(k.pointChan)
+	for i := 0; i < npoints; i++ {
+		point := <-k.pointChan
+		acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
 	}
+	return nil
 }
 
 func init() {
-	plugins.Add("kafka", func() plugins.Plugin {
+	plugins.Add("kafka_consumer", func() plugins.Plugin {
 		return &Kafka{}
 	})
 }
diff --git a/plugins/kafka_consumer/kafka_consumer_integration_test.go b/plugins/kafka_consumer/kafka_consumer_integration_test.go
index 6513a460..9f554d9a 100644
--- a/plugins/kafka_consumer/kafka_consumer_integration_test.go
+++ b/plugins/kafka_consumer/kafka_consumer_integration_test.go
@@ -15,43 +15,77 @@ func TestReadsMetricsFromKafka(t *testing.T) {
 	if testing.Short() {
 		t.Skip("Skipping integration test in short mode")
 	}
-	var zkPeers, brokerPeers []string
 
-	zkPeers = []string{testutil.GetLocalHost() + ":2181"}
-	brokerPeers = []string{testutil.GetLocalHost() + ":9092"}
-
-	k := &Kafka{
-		ConsumerGroupName: "telegraf_test_consumers",
-		Topic:             fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()),
-		ZookeeperPeers:    zkPeers,
-	}
+	brokerPeers := []string{testutil.GetLocalHost() + ":9092"}
+	zkPeers := []string{testutil.GetLocalHost() + ":2181"}
+	testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix())
 
+	// Send a Kafka message to the kafka host
 	msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
 	producer, err := sarama.NewSyncProducer(brokerPeers, nil)
 	require.NoError(t, err)
-
-	_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)})
+	_, _, err = producer.SendMessage(
+		&sarama.ProducerMessage{
+			Topic: testTopic,
+			Value: sarama.StringEncoder(msg),
+		})
 	require.NoError(t, err)
+	defer producer.Close()
+
+	// Start the Kafka Consumer
+	k := &Kafka{
+		ConsumerGroup:  "telegraf_test_consumers",
+		Topics:         []string{testTopic},
+		ZookeeperPeers: zkPeers,
+		PointBuffer:    100000,
+		Offset:         "oldest",
+	}
+	if err := k.Start(); err != nil {
+		t.Fatal(err.Error())
+	} else {
+		defer k.Stop()
+	}
 
-	producer.Close()
+	waitForPoint(k, t)
 
+	// Verify that we can now gather the sent message
 	var acc testutil.Accumulator
-
 	// Sanity check
-	assert.Equal(t, 0, len(acc.Points), "there should not be any points")
+	assert.Equal(t, 0, len(acc.Points), "There should not be any points")
 
+	// Gather points
 	err = k.Gather(&acc)
 	require.NoError(t, err)
+	if len(acc.Points) == 1 {
+		point := acc.Points[0]
+		assert.Equal(t, "cpu_load_short", point.Measurement)
+		assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
+		assert.Equal(t, map[string]string{
+			"host":      "server01",
+			"direction": "in",
+			"region":    "us-west",
+		}, point.Tags)
+		assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
+	} else {
+		t.Errorf("No points found in accumulator, expected 1")
+	}
+}
 
-	assert.Equal(t, 1, len(acc.Points), "there should be a single point")
-
-	point := acc.Points[0]
-	assert.Equal(t, "cpu_load_short", point.Measurement)
-	assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
-	assert.Equal(t, map[string]string{
-		"host":      "server01",
-		"direction": "in",
-		"region":    "us-west",
-	}, point.Tags)
-	assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
+// Waits for the metric that was sent to the kafka broker to arrive at the kafka
+// consumer
+func waitForPoint(k *Kafka, t *testing.T) {
+	// Give the kafka container up to 2 seconds to get the point to the consumer
+	ticker := time.NewTicker(5 * time.Millisecond)
+	counter := 0
+	for {
+		select {
+		case <-ticker.C:
+			counter++
+			if counter > 1000 {
+				t.Fatal("Waited for 5s, point never arrived to consumer")
+			} else if len(k.pointChan) == 1 {
+				return
+			}
+		}
+	}
 }
diff --git a/plugins/kafka_consumer/kafka_consumer_test.go b/plugins/kafka_consumer/kafka_consumer_test.go
index 6a97e322..eb047336 100644
--- a/plugins/kafka_consumer/kafka_consumer_test.go
+++ b/plugins/kafka_consumer/kafka_consumer_test.go
@@ -1,92 +1,91 @@
 package kafka_consumer
 
 import (
-	"strings"
 	"testing"
 	"time"
 
-	"github.com/Shopify/sarama"
+	"github.com/influxdb/influxdb/models"
 	"github.com/influxdb/telegraf/testutil"
+
+	"github.com/Shopify/sarama"
 	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
 )
 
-const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
+const (
+	testMsg     = "cpu_load_short,host=server01 value=23422.0 1422568543702900257"
+	invalidMsg  = "cpu_load_short,host=server01 1422568543702900257"
+	pointBuffer = 5
+)
 
-func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) {
-	halt := make(chan bool, 1)
-	metricChan := make(chan []byte, 1)
-	kafkaChan := make(chan *sarama.ConsumerMessage, 10)
-	for i := 0; i < 10; i++ {
-		kafkaChan <- saramaMsg(testMsg)
+func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
+	in := make(chan *sarama.ConsumerMessage, pointBuffer)
+	k := Kafka{
+		ConsumerGroup:   "test",
+		Topics:          []string{"telegraf"},
+		ZookeeperPeers:  []string{"localhost:2181"},
+		PointBuffer:     pointBuffer,
+		Offset:          "oldest",
+		in:              in,
+		doNotCommitMsgs: true,
+		errs:            make(chan *sarama.ConsumerError, pointBuffer),
+		done:            make(chan struct{}),
+		pointChan:       make(chan models.Point, pointBuffer),
 	}
+	return &k, in
+}
 
-	expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg
-	readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
-		batch := <-metricChan
-		assert.Equal(t, expectedBatch, string(batch))
+// Test that the parser parses kafka messages into points
+func TestRunParser(t *testing.T) {
+	k, in := NewTestKafka()
+	defer close(k.done)
 
-		halt <- true
+	go k.parser()
+	in <- saramaMsg(testMsg)
+	time.Sleep(time.Millisecond)
 
-		return nil
-	}, halt)
+	assert.Equal(t, len(k.pointChan), 1)
 }
 
-func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) {
-	halt := make(chan bool, 1)
-	metricChan := make(chan []byte, 1)
-	kafkaChan := make(chan *sarama.ConsumerMessage, 10)
-	for i := 0; i < 3; i++ {
-		kafkaChan <- saramaMsg(testMsg)
-	}
-
-	expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg
-	readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
-		batch := <-metricChan
-		assert.Equal(t, expectedBatch, string(batch))
+// Test that the parser ignores invalid messages
+func TestRunParserInvalidMsg(t *testing.T) {
+	k, in := NewTestKafka()
+	defer close(k.done)
 
-		halt <- true
+	go k.parser()
+	in <- saramaMsg(invalidMsg)
+	time.Sleep(time.Millisecond)
 
-		return nil
-	}, halt)
+	assert.Equal(t, len(k.pointChan), 0)
 }
 
-func TestEmitMetricsSendMetricsToAcc(t *testing.T) {
-	k := &Kafka{}
-	var acc testutil.Accumulator
-	testChan := make(chan []byte, 1)
-	testChan <- []byte(testMsg)
-
-	err := emitMetrics(k, &acc, testChan)
-	require.NoError(t, err)
-
-	assert.Equal(t, 1, len(acc.Points), "there should be a single point")
-
-	point := acc.Points[0]
-	assert.Equal(t, "cpu_load_short", point.Measurement)
-	assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
-	assert.Equal(t, map[string]string{
-		"host":      "server01",
-		"direction": "in",
-		"region":    "us-west",
-	}, point.Tags)
-
-	if time.Unix(0, 1422568543702900257).Unix() != point.Time.Unix() {
-		t.Errorf("Expected: %v, received %v\n",
-			time.Unix(0, 1422568543702900257).Unix(),
-			point.Time.Unix())
+// Test that points are dropped when we hit the buffer limit
+func TestRunParserRespectsBuffer(t *testing.T) {
+	k, in := NewTestKafka()
+	defer close(k.done)
+
+	go k.parser()
+	for i := 0; i < pointBuffer+1; i++ {
+		in <- saramaMsg(testMsg)
 	}
+	time.Sleep(time.Millisecond)
+
+	assert.Equal(t, len(k.pointChan), 5)
 }
 
-func TestEmitMetricsTimesOut(t *testing.T) {
-	k := &Kafka{}
-	var acc testutil.Accumulator
-	testChan := make(chan []byte)
+// Test that the parser parses kafka messages into points
+func TestRunParserAndGather(t *testing.T) {
+	k, in := NewTestKafka()
+	defer close(k.done)
+
+	go k.parser()
+	in <- saramaMsg(testMsg)
+	time.Sleep(time.Millisecond)
 
-	err := emitMetrics(k, &acc, testChan)
-	require.NoError(t, err)
+	acc := testutil.Accumulator{}
+	k.Gather(&acc)
 
-	assert.Equal(t, 0, len(acc.Points), "there should not be a any points")
+	assert.Equal(t, len(acc.Points), 1)
+	assert.True(t, acc.CheckValue("cpu_load_short", 23422.0))
 }
 
 func saramaMsg(val string) *sarama.ConsumerMessage {
diff --git a/plugins/statsd/statsd.go b/plugins/statsd/statsd.go
index 668f0e8b..2b8b2c21 100644
--- a/plugins/statsd/statsd.go
+++ b/plugins/statsd/statsd.go
@@ -183,8 +183,6 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
 }
 
 func (s *Statsd) Start() error {
-	log.Println("Starting up the statsd service")
-
 	// Make data structures
 	s.done = make(chan struct{})
 	s.in = make(chan string, s.AllowedPendingMessages)
@@ -197,6 +195,7 @@ func (s *Statsd) Start() error {
 	go s.udpListen()
 	// Start the line parser
 	go s.parser()
+	log.Printf("Started the statsd service on %s\n", s.ServiceAddress)
 	return nil
 }
 
diff --git a/testdata/single_plugin.toml b/testdata/single_plugin.toml
index b89cdebf..64354112 100644
--- a/testdata/single_plugin.toml
+++ b/testdata/single_plugin.toml
@@ -1,12 +1,9 @@
-[kafka]
-  topic = "topic_with_metrics"
-  consumerGroupName = "telegraf_metrics_consumers"
-  zookeeperPeers = ["test.example.com:2181"]
-  batchSize = 1000
+[memcached]
+  servers = ["localhost"]
   pass = ["some", "strings"]
   drop = ["other", "stuff"]
   interval = "5s"
-  [kafka.tagpass]
+  [memcached.tagpass]
     goodtag = ["mytag"]
-  [kafka.tagdrop]
+  [memcached.tagdrop]
     badtag = ["othertag"]
diff --git a/testdata/subconfig/kafka.conf b/testdata/subconfig/memcached.conf
similarity index 53%
rename from testdata/subconfig/kafka.conf
rename to testdata/subconfig/memcached.conf
index dd37a421..95189469 100644
--- a/testdata/subconfig/kafka.conf
+++ b/testdata/subconfig/memcached.conf
@@ -1,10 +1,9 @@
-[kafka]
-  zookeeperPeers = ["test.example.com:2181"]
-  batchSize = 10000
+[memcached]
+  servers = ["192.168.1.1"]
   pass = ["some", "strings"]
   drop = ["other", "stuff"]
   interval = "5s"
-  [kafka.tagpass]
+  [memcached.tagpass]
     goodtag = ["mytag"]
-  [kafka.tagdrop]
+  [memcached.tagdrop]
     badtag = ["othertag"]
diff --git a/testdata/telegraf-agent.toml b/testdata/telegraf-agent.toml
index 7a74e12e..28d08673 100644
--- a/testdata/telegraf-agent.toml
+++ b/testdata/telegraf-agent.toml
@@ -21,25 +21,25 @@
 
 # Tags can also be specified via a normal map, but only one form at a time:
 [tags]
-	# dc = "us-east-1"
+  # dc = "us-east-1"
 
 # Configuration for telegraf agent
 [agent]
-	# Default data collection interval for all plugins
-	interval = "10s"
+  # Default data collection interval for all plugins
+  interval = "10s"
 
-	# If utc = false, uses local time (utc is highly recommended)
-	utc = true
+  # If utc = false, uses local time (utc is highly recommended)
+  utc = true
 
-	# Precision of writes, valid values are n, u, ms, s, m, and h
-	# note: using second precision greatly helps InfluxDB compression
-	precision = "s"
+  # Precision of writes, valid values are n, u, ms, s, m, and h
+  # note: using second precision greatly helps InfluxDB compression
+  precision = "s"
 
-	# run telegraf in debug mode
-	debug = false
+  # run telegraf in debug mode
+  debug = false
 
-	# Override default hostname, if empty use os.Hostname()
-	hostname = ""
+  # Override default hostname, if empty use os.Hostname()
+  hostname = ""
 
 
 ###############################################################################
@@ -50,24 +50,24 @@
 
 # Configuration for influxdb server to send metrics to
 [[outputs.influxdb]]
-	# The full HTTP endpoint URL for your InfluxDB instance
-	# Multiple urls can be specified for InfluxDB cluster support. Server to
-	# write to will be randomly chosen each interval.
-	urls = ["http://localhost:8086"] # required.
+  # The full HTTP endpoint URL for your InfluxDB instance
+  # Multiple urls can be specified for InfluxDB cluster support. Server to
+  # write to will be randomly chosen each interval.
+  urls = ["http://localhost:8086"] # required.
 
-	# The target database for metrics. This database must already exist
-	database = "telegraf" # required.
+  # The target database for metrics. This database must already exist
+  database = "telegraf" # required.
 
-	# Connection timeout (for the connection with InfluxDB), formatted as a string.
-	# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
-	# If not provided, will default to 0 (no timeout)
-	# timeout = "5s"
+  # Connection timeout (for the connection with InfluxDB), formatted as a string.
+  # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
+  # If not provided, will default to 0 (no timeout)
+  # timeout = "5s"
 
-	# username = "telegraf"
-	# password = "metricsmetricsmetricsmetrics"
+  # username = "telegraf"
+  # password = "metricsmetricsmetricsmetrics"
 
-	# Set the user agent for the POSTs (can be useful for log differentiation)
-	# user_agent = "telegraf"
+  # Set the user agent for the POSTs (can be useful for log differentiation)
+  # user_agent = "telegraf"
 
 [[outputs.influxdb]]
   urls = ["udp://localhost:8089"]
@@ -75,13 +75,13 @@
 
 # Configuration for the Kafka server to send metrics to
 [[outputs.kafka]]
-	# URLs of kafka brokers
-	brokers = ["localhost:9092"]
-	# Kafka topic for producer messages
-	topic = "telegraf"
-	# Telegraf tag to use as a routing key
-	#  ie, if this tag exists, it's value will be used as the routing key
-	routing_tag = "host"
+  # URLs of kafka brokers
+  brokers = ["localhost:9092"]
+  # Kafka topic for producer messages
+  topic = "telegraf"
+  # Telegraf tag to use as a routing key
+  #  ie, if this tag exists, it's value will be used as the routing key
+  routing_tag = "host"
 
 
 ###############################################################################
@@ -95,239 +95,238 @@ urls = ["http://localhost/server-status?auto"]
 
 # Read metrics about cpu usage
 [cpu]
-	# Whether to report per-cpu stats or not
-	percpu = true
-	# Whether to report total system cpu stats or not
-	totalcpu = true
-	# Comment this line if you want the raw CPU time metrics
-	drop = ["cpu_time"]
+  # Whether to report per-cpu stats or not
+  percpu = true
+  # Whether to report total system cpu stats or not
+  totalcpu = true
+  # Comment this line if you want the raw CPU time metrics
+  drop = ["cpu_time"]
 
 # Read metrics about disk usage by mount point
 [disk]
-	# no configuration
+  # no configuration
 
 # Read metrics from one or many disque servers
 [disque]
-	# An array of URI to gather stats about. Specify an ip or hostname
-	# with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
-	# 10.0.0.1:10000, etc.
-	#
-	# If no servers are specified, then localhost is used as the host.
-	servers = ["localhost"]
+  # An array of URI to gather stats about. Specify an ip or hostname
+  # with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
+  # 10.0.0.1:10000, etc.
+  #
+  # If no servers are specified, then localhost is used as the host.
+  servers = ["localhost"]
 
 # Read stats from one or more Elasticsearch servers or clusters
 [elasticsearch]
-	# specify a list of one or more Elasticsearch servers
-	servers = ["http://localhost:9200"]
+  # specify a list of one or more Elasticsearch servers
+  servers = ["http://localhost:9200"]
 
-	# set local to false when you want to read the indices stats from all nodes
-	# within the cluster
-	local = true
+  # set local to false when you want to read the indices stats from all nodes
+  # within the cluster
+  local = true
 
 # Read flattened metrics from one or more commands that output JSON to stdout
 [exec]
-	# specify commands via an array of tables
-	[[exec.commands]]
-	# the command to run
-	command = "/usr/bin/mycollector --foo=bar"
+  # specify commands via an array of tables
+  [[exec.commands]]
+  # the command to run
+  command = "/usr/bin/mycollector --foo=bar"
 
-	# name of the command (used as a prefix for measurements)
-	name = "mycollector"
+  # name of the command (used as a prefix for measurements)
+  name = "mycollector"
 
 # Read metrics of haproxy, via socket or csv stats page
 [haproxy]
-	# An array of address to gather stats about. Specify an ip on hostname
-	# with optional port. ie localhost, 10.10.3.33:1936, etc.
-	#
-	# If no servers are specified, then default to 127.0.0.1:1936
-	servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
-	# Or you can also use local socket(not work yet)
-	# servers = ["socket:/run/haproxy/admin.sock"]
+  # An array of address to gather stats about. Specify an ip on hostname
+  # with optional port. ie localhost, 10.10.3.33:1936, etc.
+  #
+  # If no servers are specified, then default to 127.0.0.1:1936
+  servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
+  # Or you can also use local socket(not work yet)
+  # servers = ["socket:/run/haproxy/admin.sock"]
 
 # Read flattened metrics from one or more JSON HTTP endpoints
 [httpjson]
-	# Specify services via an array of tables
-	[[httpjson.services]]
+  # Specify services via an array of tables
+  [[httpjson.services]]
 
-		# a name for the service being polled
-		name = "webserver_stats"
+    # a name for the service being polled
+    name = "webserver_stats"
 
-		# URL of each server in the service's cluster
-		servers = [
-			"http://localhost:9999/stats/",
-			"http://localhost:9998/stats/",
-		]
+    # URL of each server in the service's cluster
+    servers = [
+      "http://localhost:9999/stats/",
+      "http://localhost:9998/stats/",
+    ]
 
-		# HTTP method to use (case-sensitive)
-		method = "GET"
+    # HTTP method to use (case-sensitive)
+    method = "GET"
 
-		# HTTP parameters (all values must be strings)
-		[httpjson.services.parameters]
-			event_type = "cpu_spike"
-			threshold = "0.75"
+    # HTTP parameters (all values must be strings)
+    [httpjson.services.parameters]
+      event_type = "cpu_spike"
+      threshold = "0.75"
 
 # Read metrics about disk IO by device
 [io]
-	# no configuration
+  # no configuration
 
 # read metrics from a Kafka topic
-[kafka]
-	# topic to consume
-	topic = "topic_with_metrics"
-
-	# the name of the consumer group
-	consumerGroupName = "telegraf_metrics_consumers"
-
-	# an array of Zookeeper connection strings
-	zookeeperPeers = ["localhost:2181"]
-
-	# Batch size of points sent to InfluxDB
-	batchSize = 1000
+[kafka_consumer]
+  # topic(s) to consume
+  topics = ["telegraf"]
+  # an array of Zookeeper connection strings
+  zookeeper_peers = ["localhost:2181"]
+  # the name of the consumer group
+  consumer_group = "telegraf_metrics_consumers"
+  # Maximum number of points to buffer between collection intervals
+  point_buffer = 100000
+  # Offset (must be either "oldest" or "newest")
+  offset = "oldest"
 
 # Read metrics from a LeoFS Server via SNMP
 [leofs]
-	# An array of URI to gather stats about LeoFS.
-	# Specify an ip or hostname with port. ie 127.0.0.1:4020
-	#
-	# If no servers are specified, then 127.0.0.1 is used as the host and 4020 as the port.
-	servers = ["127.0.0.1:4021"]
+  # An array of URI to gather stats about LeoFS.
+  # Specify an ip or hostname with port. ie 127.0.0.1:4020
+  #
+  # If no servers are specified, then 127.0.0.1 is used as the host and 4020 as the port.
+  servers = ["127.0.0.1:4021"]
 
 # Read metrics from local Lustre service on OST, MDS
 [lustre2]
-	# An array of /proc globs to search for Lustre stats
-	# If not specified, the default will work on Lustre 2.5.x
-	#
-	# ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"]
-	# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]
+  # An array of /proc globs to search for Lustre stats
+  # If not specified, the default will work on Lustre 2.5.x
+  #
+  # ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"]
+  # mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]
 
 # Read metrics about memory usage
 [mem]
-	# no configuration
+  # no configuration
 
 # Read metrics from one or many memcached servers
 [memcached]
-	# An array of address to gather stats about. Specify an ip on hostname
-	# with optional port. ie localhost, 10.0.0.1:11211, etc.
-	#
-	# If no servers are specified, then localhost is used as the host.
-	servers = ["localhost"]
+  # An array of address to gather stats about. Specify an ip on hostname
+  # with optional port. ie localhost, 10.0.0.1:11211, etc.
+  #
+  # If no servers are specified, then localhost is used as the host.
+  servers = ["localhost"]
 
 # Read metrics from one or many MongoDB servers
 [mongodb]
-	# An array of URI to gather stats about. Specify an ip or hostname
-	# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
-	# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
-	#
-	# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
-	servers = ["127.0.0.1:27017"]
+  # An array of URI to gather stats about. Specify an ip or hostname
+  # with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
+  # mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
+  #
+  # If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
+  servers = ["127.0.0.1:27017"]
 
 # Read metrics from one or many mysql servers
 [mysql]
-	# specify servers via a url matching:
-	#  [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]]
-	#  e.g.
-	#    servers = ["root:root@http://10.0.0.18/?tls=false"]
-	#    servers = ["root:passwd@tcp(127.0.0.1:3306)/"]
-	#
-	# If no servers are specified, then localhost is used as the host.
-	servers = ["localhost"]
+  # specify servers via a url matching:
+  #  [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]]
+  #  e.g.
+  #    servers = ["root:root@http://10.0.0.18/?tls=false"]
+  #    servers = ["root:passwd@tcp(127.0.0.1:3306)/"]
+  #
+  # If no servers are specified, then localhost is used as the host.
+  servers = ["localhost"]
 
 # Read metrics about network interface usage
 [net]
-	# By default, telegraf gathers stats from any up interface (excluding loopback)
-	# Setting interfaces will tell it to gather these explicit interfaces,
-	# regardless of status.
-	#
-	# interfaces = ["eth0", ... ]
+  # By default, telegraf gathers stats from any up interface (excluding loopback)
+  # Setting interfaces will tell it to gather these explicit interfaces,
+  # regardless of status.
+  #
+  # interfaces = ["eth0", ... ]
 
 # Read Nginx's basic status information (ngx_http_stub_status_module)
 [nginx]
-	# An array of Nginx stub_status URI to gather stats.
-	urls = ["http://localhost/status"]
+  # An array of Nginx stub_status URI to gather stats.
+  urls = ["http://localhost/status"]
 
 # Ping given url(s) and return statistics
 [ping]
-	# urls to ping
-	urls = ["www.google.com"] # required
-	# number of pings to send (ping -c <COUNT>)
-	count = 1 # required
-	# interval, in s, at which to ping. 0 == default (ping -i <PING_INTERVAL>)
-	ping_interval = 0.0
-	# ping timeout, in s. 0 == no timeout (ping -t <TIMEOUT>)
-	timeout = 0.0
-	# interface to send ping from (ping -I <INTERFACE>)
-	interface = ""
+  # urls to ping
+  urls = ["www.google.com"] # required
+  # number of pings to send (ping -c <COUNT>)
+  count = 1 # required
+  # interval, in s, at which to ping. 0 == default (ping -i <PING_INTERVAL>)
+  ping_interval = 0.0
+  # ping timeout, in s. 0 == no timeout (ping -t <TIMEOUT>)
+  timeout = 0.0
+  # interface to send ping from (ping -I <INTERFACE>)
+  interface = ""
 
 # Read metrics from one or many postgresql servers
 [postgresql]
-	# specify servers via an array of tables
-	[[postgresql.servers]]
-
-	# specify address via a url matching:
-	#   postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
-	# or a simple string:
-	#   host=localhost user=pqotest password=... sslmode=... dbname=app_production
-	#
-	# All connection parameters are optional. By default, the host is localhost
-	# and the user is the currently running user. For localhost, we default
-	# to sslmode=disable as well.
-	#
-	# Without the dbname parameter, the driver will default to a database
-	# with the same name as the user. This dbname is just for instantiating a
-	# connection with the server and doesn't restrict the databases we are trying
-	# to grab metrics for.
-	#
-
-	address = "sslmode=disable"
-
-	# A list of databases to pull metrics about. If not specified, metrics for all
-	# databases are gathered.
-
-	# databases = ["app_production", "blah_testing"]
-
-	# [[postgresql.servers]]
-	# address = "influx@remoteserver"
+  # specify servers via an array of tables
+  [[postgresql.servers]]
+
+  # specify address via a url matching:
+  #   postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
+  # or a simple string:
+  #   host=localhost user=pqotest password=... sslmode=... dbname=app_production
+  #
+  # All connection parameters are optional. By default, the host is localhost
+  # and the user is the currently running user. For localhost, we default
+  # to sslmode=disable as well.
+  #
+  # Without the dbname parameter, the driver will default to a database
+  # with the same name as the user. This dbname is just for instantiating a
+  # connection with the server and doesn't restrict the databases we are trying
+  # to grab metrics for.
+  #
+
+  address = "sslmode=disable"
+
+  # A list of databases to pull metrics about. If not specified, metrics for all
+  # databases are gathered.
+
+  # databases = ["app_production", "blah_testing"]
+
+  # [[postgresql.servers]]
+  # address = "influx@remoteserver"
 
 # Read metrics from one or many prometheus clients
 [prometheus]
-	# An array of urls to scrape metrics from.
-	urls = ["http://localhost:9100/metrics"]
+  # An array of urls to scrape metrics from.
+  urls = ["http://localhost:9100/metrics"]
 
 # Read metrics from one or many RabbitMQ servers via the management API
 [rabbitmq]
-	# Specify servers via an array of tables
-	[[rabbitmq.servers]]
-	# name = "rmq-server-1" # optional tag
-	# url = "http://localhost:15672"
-	# username = "guest"
-	# password = "guest"
+  # Specify servers via an array of tables
+  [[rabbitmq.servers]]
+  # name = "rmq-server-1" # optional tag
+  # url = "http://localhost:15672"
+  # username = "guest"
+  # password = "guest"
 
-	# A list of nodes to pull metrics about. If not specified, metrics for
-	# all nodes are gathered.
-	# nodes = ["rabbit@node1", "rabbit@node2"]
+  # A list of nodes to pull metrics about. If not specified, metrics for
+  # all nodes are gathered.
+  # nodes = ["rabbit@node1", "rabbit@node2"]
 
 # Read metrics from one or many redis servers
 [redis]
-	# An array of URI to gather stats about. Specify an ip or hostname
-	# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
-	# 10.0.0.1:10000, etc.
-	#
-	# If no servers are specified, then localhost is used as the host.
-	servers = ["localhost"]
+  # An array of URI to gather stats about. Specify an ip or hostname
+  # with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
+  # 10.0.0.1:10000, etc.
+  #
+  # If no servers are specified, then localhost is used as the host.
+  servers = ["localhost"]
 
 # Read metrics from one or many RethinkDB servers
 [rethinkdb]
-	# An array of URI to gather stats about. Specify an ip or hostname
-	# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
-	# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
-	#
-	# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port.
-	servers = ["127.0.0.1:28015"]
+  # An array of URI to gather stats about. Specify an ip or hostname
+  # with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
+  # rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
+  #
+  # If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port.
+  servers = ["127.0.0.1:28015"]
 
 # Read metrics about swap memory usage
 [swap]
-	# no configuration
+  # no configuration
 
 # Read metrics about system load & uptime
 [system]
-	# no configuration
+  # no configuration
-- 
GitLab