diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2216e6a8fad71762c2b6d445f7e23b06a6ecd1f0..69a5fefeea2d9192a3d07e9e11465d9430713cf0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,7 @@
 ### Release Notes
 ### Features
 
+- [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support
 - [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin
 - [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted
 - [#2817](https://github.com/influxdata/telegraf/pull/2817): Added timeout option to IPMI sensor plugin
diff --git a/Godeps b/Godeps
index da3ccdc351dd2a504843dd1b90c001570986846d..b53f71848850ab811cb799d311b5a73a1c091859 100644
--- a/Godeps
+++ b/Godeps
@@ -1,5 +1,5 @@
 collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
-github.com/Shopify/sarama 574d3147eee384229bf96a5d12c207fe7b5234f3
+github.com/Shopify/sarama c01858abb625b73a3af51d0798e4ad42c8147093
 github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
 github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
 github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
@@ -52,6 +52,7 @@ github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
 github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987
 github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe
 github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
+github.com/bsm/sarama-cluster ccdc0803695fbce22f1706d04ded46cd518fd832
 github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096
 github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a
 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
diff --git a/Makefile b/Makefile
index d2bad656d0abc3b9f60b602b9bf092928d52e791..1f4233ab0ce53fdce309c6323d950c081f271ba0 100644
--- a/Makefile
+++ b/Makefile
@@ -46,11 +46,15 @@ prepare-windows:
 # Run all docker containers necessary for unit tests
 docker-run:
 	docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
+	docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
 	docker run --name kafka \
-		-e ADVERTISED_HOST=localhost \
-		-e ADVERTISED_PORT=9092 \
-		-p "2181:2181" -p "9092:9092" \
-		-d spotify/kafka
+		--link zookeeper:zookeeper \
+		-e KAFKA_ADVERTISED_HOST_NAME=localhost \
+		-e KAFKA_ADVERTISED_PORT=9092 \
+		-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
+		-e KAFKA_CREATE_TOPICS="test:1:1" \
+		-p "9092:9092" \
+		-d wurstmeister/kafka
 	docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
 	docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql
 	docker run --name memcached -p "11211:11211" -d memcached
@@ -65,11 +69,15 @@ docker-run:
 # Run docker containers necessary for CircleCI unit tests
 docker-run-circle:
 	docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
+	docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
 	docker run --name kafka \
-		-e ADVERTISED_HOST=localhost \
-		-e ADVERTISED_PORT=9092 \
-		-p "2181:2181" -p "9092:9092" \
-		-d spotify/kafka
+		--link zookeeper:zookeeper \
+		-e KAFKA_ADVERTISED_HOST_NAME=localhost \
+		-e KAFKA_ADVERTISED_PORT=9092 \
+		-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
+		-e KAFKA_CREATE_TOPICS="test:1:1" \
+		-p "9092:9092" \
+		-d wurstmeister/kafka
 	docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
 	docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
 	docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
@@ -78,8 +86,8 @@ docker-run-circle:
 
 # Kill all docker containers, ignore errors
 docker-kill:
-	-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch
-	-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch
+	-docker kill nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch
+	-docker rm nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch
 
 # Run full unit tests using docker containers (includes setup and teardown)
 test: vet docker-kill docker-run
diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md
index 3b4d0a34d1cfc9326676d4e31bdea6fcab182531..c62c262c0e80186800be790d07d01290f70e1204 100644
--- a/docs/LICENSE_OF_DEPENDENCIES.md
+++ b/docs/LICENSE_OF_DEPENDENCIES.md
@@ -10,6 +10,7 @@ works:
 - github.com/aws/aws-sdk-go [APACHE](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt)
 - github.com/beorn7/perks [MIT](https://github.com/beorn7/perks/blob/master/LICENSE)
 - github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE)
+- github.com/bsm/sarama-cluster [MIT](https://github.com/bsm/sarama-cluster/blob/master/LICENSE)
 - github.com/cenkalti/backoff [MIT](https://github.com/cenkalti/backoff/blob/master/LICENSE)
 - github.com/couchbase/go-couchbase [MIT](https://github.com/couchbase/go-couchbase/blob/master/LICENSE)
 - github.com/couchbase/gomemcached [MIT](https://github.com/couchbase/gomemcached/blob/master/LICENSE)
diff --git a/etc/telegraf.conf b/etc/telegraf.conf
index d455b10af9e0e81138efb94ff116ba11f54d40b9..5c105a30aa869262f4f75752bdefad7795dfe7a3 100644
--- a/etc/telegraf.conf
+++ b/etc/telegraf.conf
@@ -2198,11 +2198,42 @@
 #   ## 0 means to use the default of 65536 bytes (64 kibibytes)
 #   max_line_size = 0
 
-
-# # Read metrics from Kafka topic(s)
+# # Read metrics from Kafka 0.9+ topic(s)
 # [[inputs.kafka_consumer]]
 #   ## topic(s) to consume
 #   topics = ["telegraf"]
+#   ## kafka servers
+#   brokers = ["localhost:9092"]
+#   ## the name of the consumer group
+#   consumer_group = "telegraf_metrics_consumers"
+#   ## Offset (must be either "oldest" or "newest")
+#   offset = "oldest"
+#
+#   ## Optional SSL Config
+#   # ssl_ca = "/etc/telegraf/ca.pem"
+#   # ssl_cert = "/etc/telegraf/cert.pem"
+#   # ssl_key = "/etc/telegraf/key.pem"
+#   ## Use SSL but skip chain & host verification
+#   # insecure_skip_verify = false
+#
+#   ## Optional SASL Config
+#   # sasl_username = "kafka"
+#   # sasl_password = "secret"
+#
+#   ## Data format to consume.
+#   ## Each data format has its own unique set of configuration options, read
+#   ## more about them here:
+#   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+#   data_format = "influx"
+#
+#   ## Maximum length of a message to consume, in bytes (default 0/unlimited);
+#   ## larger messages are dropped
+#   max_message_len = 65536
+
+# # Read metrics from Kafka (0.8 or less) topic(s)
+# [[inputs.kafka_consumer_legacy]]
+#   ## topic(s) to consume
+#   topics = ["telegraf"]
 #   ## an array of Zookeeper connection strings
 #   zookeeper_peers = ["localhost:2181"]
 #   ## Zookeeper Chroot
diff --git a/internal/config/testdata/telegraf-agent.toml b/internal/config/testdata/telegraf-agent.toml
index 5cf82af763d649493a33f47369ae176a603d4e0a..9da79605f3833cd18d85c2789601076af3800d55 100644
--- a/internal/config/testdata/telegraf-agent.toml
+++ b/internal/config/testdata/telegraf-agent.toml
@@ -143,19 +143,31 @@
 [[inputs.diskio]]
   # no configuration
 
-# read metrics from a Kafka topic
+# read metrics from a Kafka 0.9+ topic
 [[inputs.kafka_consumer]]
-  # topic(s) to consume
+  ## kafka brokers
+  brokers = ["localhost:9092"]
+  ## topic(s) to consume
+  topics = ["telegraf"]
+  ## the name of the consumer group
+  consumer_group = "telegraf_metrics_consumers"
+  ## Offset (must be either "oldest" or "newest")
+  offset = "oldest"
+
+# read metrics from a Kafka legacy topic
+[[inputs.kafka_consumer_legacy]]
+  ## topic(s) to consume
   topics = ["telegraf"]
   # an array of Zookeeper connection strings
   zookeeper_peers = ["localhost:2181"]
-  # the name of the consumer group
+  ## the name of the consumer group
   consumer_group = "telegraf_metrics_consumers"
   # Maximum number of points to buffer between collection intervals
   point_buffer = 100000
-  # Offset (must be either "oldest" or "newest")
+  ## Offset (must be either "oldest" or "newest")
   offset = "oldest"
 
+
 # Read metrics from a LeoFS Server via SNMP
 [[inputs.leofs]]
   # An array of URI to gather stats about LeoFS.
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index 10af14864f74debf3fdd49152605192b86a3a3f7..456852ff2c7f0a342ffea9db5bdc8307cb3d2acc 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -35,6 +35,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/iptables"
 	_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
 	_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
+	_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
 	_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
 	_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
 	_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md
index 31976788bc75f73ad8143596ff3fa26c920be59f..695001274c124f44e6c303a581a6e2072c9ed1f4 100644
--- a/plugins/inputs/kafka_consumer/README.md
+++ b/plugins/inputs/kafka_consumer/README.md
@@ -6,6 +6,9 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con
 is used to talk to the Kafka cluster so multiple instances of telegraf can read
 from the same topic in parallel.
 
+For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin
+and use the old zookeeper connection method.
+
 ## Configuration
 
 ```toml
@@ -13,15 +16,23 @@ from the same topic in parallel.
 [[inputs.kafka_consumer]]
   ## topic(s) to consume
   topics = ["telegraf"]
-  ## an array of Zookeeper connection strings
-  zookeeper_peers = ["localhost:2181"]
-  ## Zookeeper Chroot
-  zookeeper_chroot = ""
+  brokers = ["localhost:9092"]
   ## the name of the consumer group
   consumer_group = "telegraf_metrics_consumers"
   ## Offset (must be either "oldest" or "newest")
   offset = "oldest"
 
+  ## Optional SSL Config
+  # ssl_ca = "/etc/telegraf/ca.pem"
+  # ssl_cert = "/etc/telegraf/cert.pem"
+  # ssl_key = "/etc/telegraf/key.pem"
+  ## Use SSL but skip chain & host verification
+  # insecure_skip_verify = false
+
+  ## Optional SASL Config
+  # sasl_username = "kafka"
+  # sasl_password = "secret"
+
   ## Data format to consume.
   ## Each data format has its own unique set of configuration options, read
   ## more about them here:
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go
index 2f6933db0c0047009e4eca3a437795f96d36e82c..4e4715617c9f4a8266f7d351d0e5b19e7242782a 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer.go
@@ -7,20 +7,35 @@ import (
 	"sync"
 
 	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal"
 	"github.com/influxdata/telegraf/plugins/inputs"
 	"github.com/influxdata/telegraf/plugins/parsers"
 
 	"github.com/Shopify/sarama"
-	"github.com/wvanbergen/kafka/consumergroup"
+	cluster "github.com/bsm/sarama-cluster"
 )
 
 type Kafka struct {
-	ConsumerGroup   string
-	Topics          []string
-	MaxMessageLen   int
-	ZookeeperPeers  []string
-	ZookeeperChroot string
-	Consumer        *consumergroup.ConsumerGroup
+	ConsumerGroup string
+	Topics        []string
+	Brokers       []string
+	MaxMessageLen int
+
+	Cluster *cluster.Consumer
+
+	// Verify Kafka SSL Certificate
+	InsecureSkipVerify bool
+	// Path to CA file
+	SSLCA string `toml:"ssl_ca"`
+	// Path to host cert file
+	SSLCert string `toml:"ssl_cert"`
+	// Path to cert key file
+	SSLKey string `toml:"ssl_key"`
+
+	// SASL Username
+	SASLUsername string `toml:"sasl_username"`
+	// SASL Password
+	SASLPassword string `toml:"sasl_password"`
 
 	// Legacy metric buffer support
 	MetricBuffer int
@@ -47,12 +62,22 @@ type Kafka struct {
 }
 
 var sampleConfig = `
+  ## kafka servers
+  brokers = ["localhost:9092"]
   ## topic(s) to consume
   topics = ["telegraf"]
-  ## an array of Zookeeper connection strings
-  zookeeper_peers = ["localhost:2181"]
-  ## Zookeeper Chroot
-  zookeeper_chroot = ""
+
+  ## Optional SSL Config
+  # ssl_ca = "/etc/telegraf/ca.pem"
+  # ssl_cert = "/etc/telegraf/cert.pem"
+  # ssl_key = "/etc/telegraf/key.pem"
+  ## Use SSL but skip chain & host verification
+  # insecure_skip_verify = false
+
+  ## Optional SASL Config
+  # sasl_username = "kafka"
+  # sasl_password = "secret"
+
   ## the name of the consumer group
   consumer_group = "telegraf_metrics_consumers"
   ## Offset (must be either "oldest" or "newest")
@@ -84,45 +109,67 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
 func (k *Kafka) Start(acc telegraf.Accumulator) error {
 	k.Lock()
 	defer k.Unlock()
-	var consumerErr error
+	var clusterErr error
 
 	k.acc = acc
 
-	config := consumergroup.NewConfig()
-	config.Zookeeper.Chroot = k.ZookeeperChroot
+	config := cluster.NewConfig()
+	config.Consumer.Return.Errors = true
+
+	tlsConfig, err := internal.GetTLSConfig(
+		k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
+	if err != nil {
+		return err
+	}
+
+	if tlsConfig != nil {
+		log.Printf("D! TLS Enabled")
+		config.Net.TLS.Config = tlsConfig
+		config.Net.TLS.Enable = true
+	}
+	if k.SASLUsername != "" && k.SASLPassword != "" {
+		log.Printf("D! Using SASL auth with username '%s',",
+			k.SASLUsername)
+		config.Net.SASL.User = k.SASLUsername
+		config.Net.SASL.Password = k.SASLPassword
+		config.Net.SASL.Enable = true
+	}
+
 	switch strings.ToLower(k.Offset) {
 	case "oldest", "":
-		config.Offsets.Initial = sarama.OffsetOldest
+		config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	case "newest":
-		config.Offsets.Initial = sarama.OffsetNewest
+		config.Consumer.Offsets.Initial = sarama.OffsetNewest
 	default:
 		log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
 			k.Offset)
-		config.Offsets.Initial = sarama.OffsetOldest
+		config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	}
 
-	if k.Consumer == nil || k.Consumer.Closed() {
-		k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
+	if k.Cluster == nil {
+		k.Cluster, clusterErr = cluster.NewConsumer(
+			k.Brokers,
 			k.ConsumerGroup,
 			k.Topics,
-			k.ZookeeperPeers,
 			config,
 		)
-		if consumerErr != nil {
-			return consumerErr
+
+		if clusterErr != nil {
+			log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n",
+				k.Brokers, k.Topics)
+			return clusterErr
 		}
 
 		// Setup message and error channels
-		k.in = k.Consumer.Messages()
-		k.errs = k.Consumer.Errors()
+		k.in = k.Cluster.Messages()
+		k.errs = k.Cluster.Errors()
 	}
 
 	k.done = make(chan struct{})
-
 	// Start the kafka message reader
 	go k.receiver()
-	log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
-		k.ZookeeperPeers, k.Topics)
+	log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n",
+		k.Brokers, k.Topics)
 	return nil
 }
 
@@ -156,7 +203,7 @@ func (k *Kafka) receiver() {
 				// TODO(cam) this locking can be removed if this PR gets merged:
 				// https://github.com/wvanbergen/kafka/pull/84
 				k.Lock()
-				k.Consumer.CommitUpto(msg)
+				k.Cluster.MarkOffset(msg, "")
 				k.Unlock()
 			}
 		}
@@ -167,7 +214,7 @@ func (k *Kafka) Stop() {
 	k.Lock()
 	defer k.Unlock()
 	close(k.done)
-	if err := k.Consumer.Close(); err != nil {
+	if err := k.Cluster.Close(); err != nil {
 		k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
 	}
 }
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go
index 41ce10157094606508c7c99e8ca180210902531e..a145a938afc4fcedda428b4ce0b01b0160cd5a8c 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go
@@ -19,7 +19,6 @@ func TestReadsMetricsFromKafka(t *testing.T) {
 	}
 
 	brokerPeers := []string{testutil.GetLocalHost() + ":9092"}
-	zkPeers := []string{testutil.GetLocalHost() + ":2181"}
 	testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix())
 
 	// Send a Kafka message to the kafka host
@@ -36,11 +35,11 @@ func TestReadsMetricsFromKafka(t *testing.T) {
 
 	// Start the Kafka Consumer
 	k := &Kafka{
-		ConsumerGroup:  "telegraf_test_consumers",
-		Topics:         []string{testTopic},
-		ZookeeperPeers: zkPeers,
-		PointBuffer:    100000,
-		Offset:         "oldest",
+		ConsumerGroup: "telegraf_test_consumers",
+		Topics:        []string{testTopic},
+		Brokers:       brokerPeers,
+		PointBuffer:   100000,
+		Offset:        "oldest",
 	}
 	p, _ := parsers.NewInfluxParser()
 	k.SetParser(p)
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
index 5519dd0d17b44ed61e83369792ed178b95f11094..9a585d6ede4cabd8ce78f915ff49d0ee5fedb837 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
@@ -23,7 +23,7 @@ func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
 	k := Kafka{
 		ConsumerGroup:   "test",
 		Topics:          []string{"telegraf"},
-		ZookeeperPeers:  []string{"localhost:2181"},
+		Brokers:         []string{"localhost:9092"},
 		Offset:          "oldest",
 		in:              in,
 		doNotCommitMsgs: true,
diff --git a/plugins/inputs/kafka_consumer_legacy/README.md b/plugins/inputs/kafka_consumer_legacy/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..31976788bc75f73ad8143596ff3fa26c920be59f
--- /dev/null
+++ b/plugins/inputs/kafka_consumer_legacy/README.md
@@ -0,0 +1,39 @@
+# Kafka Consumer Input Plugin
+
+The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka
+topic and adds messages to InfluxDB. The plugin assumes messages follow the
+line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup)
+is used to talk to the Kafka cluster so multiple instances of telegraf can read
+from the same topic in parallel.
+
+## Configuration
+
+```toml
+# Read metrics from Kafka topic(s)
+[[inputs.kafka_consumer]]
+  ## topic(s) to consume
+  topics = ["telegraf"]
+  ## an array of Zookeeper connection strings
+  zookeeper_peers = ["localhost:2181"]
+  ## Zookeeper Chroot
+  zookeeper_chroot = ""
+  ## the name of the consumer group
+  consumer_group = "telegraf_metrics_consumers"
+  ## Offset (must be either "oldest" or "newest")
+  offset = "oldest"
+
+  ## Data format to consume.
+  ## Each data format has its own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+
+  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
+  ## larger messages are dropped
+  max_message_len = 65536
+```
+
+## Testing
+
+Running integration tests requires running Zookeeper & Kafka. See Makefile
+for kafka container command.
diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy.go
new file mode 100644
index 0000000000000000000000000000000000000000..d9558d5bd080a1ff8a6e483681088396b8e704d1
--- /dev/null
+++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy.go
@@ -0,0 +1,183 @@
+package kafka_consumer_legacy
+
+import (
+	"fmt"
+	"log"
+	"strings"
+	"sync"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/plugins/inputs"
+	"github.com/influxdata/telegraf/plugins/parsers"
+
+	"github.com/Shopify/sarama"
+	"github.com/wvanbergen/kafka/consumergroup"
+)
+
+type Kafka struct {
+	ConsumerGroup   string
+	Topics          []string
+	MaxMessageLen   int
+	ZookeeperPeers  []string
+	ZookeeperChroot string
+	Consumer        *consumergroup.ConsumerGroup
+
+	// Legacy metric buffer support
+	MetricBuffer int
+	// TODO remove PointBuffer, legacy support
+	PointBuffer int
+
+	Offset string
+	parser parsers.Parser
+
+	sync.Mutex
+
+	// channel for all incoming kafka messages
+	in <-chan *sarama.ConsumerMessage
+	// channel for all kafka consumer errors
+	errs <-chan error
+	done chan struct{}
+
+	// keep the accumulator internally:
+	acc telegraf.Accumulator
+
+	// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
+	// this is mostly for test purposes, but there may be a use-case for it later.
+	doNotCommitMsgs bool
+}
+
+var sampleConfig = `
+  ## topic(s) to consume
+  topics = ["telegraf"]
+  ## an array of Zookeeper connection strings
+  zookeeper_peers = ["localhost:2181"]
+  ## Zookeeper Chroot
+  zookeeper_chroot = ""
+  ## the name of the consumer group
+  consumer_group = "telegraf_metrics_consumers"
+  ## Offset (must be either "oldest" or "newest")
+  offset = "oldest"
+
+  ## Data format to consume.
+  ## Each data format has its own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+
+  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
+  ## larger messages are dropped
+  max_message_len = 65536
+`
+
+func (k *Kafka) SampleConfig() string {
+	return sampleConfig
+}
+
+func (k *Kafka) Description() string {
+	return "Read metrics from Kafka topic(s)"
+}
+
+func (k *Kafka) SetParser(parser parsers.Parser) {
+	k.parser = parser
+}
+
+func (k *Kafka) Start(acc telegraf.Accumulator) error {
+	k.Lock()
+	defer k.Unlock()
+	var consumerErr error
+
+	k.acc = acc
+
+	config := consumergroup.NewConfig()
+	config.Zookeeper.Chroot = k.ZookeeperChroot
+	switch strings.ToLower(k.Offset) {
+	case "oldest", "":
+		config.Offsets.Initial = sarama.OffsetOldest
+	case "newest":
+		config.Offsets.Initial = sarama.OffsetNewest
+	default:
+		log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
+			k.Offset)
+		config.Offsets.Initial = sarama.OffsetOldest
+	}
+
+	if k.Consumer == nil || k.Consumer.Closed() {
+		k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
+			k.ConsumerGroup,
+			k.Topics,
+			k.ZookeeperPeers,
+			config,
+		)
+		if consumerErr != nil {
+			return consumerErr
+		}
+
+		// Setup message and error channels
+		k.in = k.Consumer.Messages()
+		k.errs = k.Consumer.Errors()
+	}
+
+	k.done = make(chan struct{})
+
+	// Start the kafka message reader
+	go k.receiver()
+	log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
+		k.ZookeeperPeers, k.Topics)
+	return nil
+}
+
+// receiver() reads all incoming messages from the consumer, and parses them into
+// influxdb metric points.
+func (k *Kafka) receiver() {
+	for {
+		select {
+		case <-k.done:
+			return
+		case err := <-k.errs:
+			if err != nil {
+				k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err))
+			}
+		case msg := <-k.in:
+			if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
+				k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)",
+					len(msg.Value), k.MaxMessageLen))
+			} else {
+				metrics, err := k.parser.Parse(msg.Value)
+				if err != nil {
+					k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s",
+						string(msg.Value), err.Error()))
+				}
+				for _, metric := range metrics {
+					k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
+				}
+			}
+
+			if !k.doNotCommitMsgs {
+				// TODO(cam) this locking can be removed if this PR gets merged:
+				// https://github.com/wvanbergen/kafka/pull/84
+				k.Lock()
+				k.Consumer.CommitUpto(msg)
+				k.Unlock()
+			}
+		}
+	}
+}
+
+func (k *Kafka) Stop() {
+	k.Lock()
+	defer k.Unlock()
+	close(k.done)
+	if err := k.Consumer.Close(); err != nil {
+		k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
+	}
+}
+
+func (k *Kafka) Gather(acc telegraf.Accumulator) error {
+	return nil
+}
+
+func init() {
+	inputs.Add("kafka_consumer_legacy", func() telegraf.Input {
+		return &Kafka{}
+	})
+}
diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..3fe80c4a4359d9d5ae027efc70b8c6210ee95f54
--- /dev/null
+++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go
@@ -0,0 +1,96 @@
+package kafka_consumer_legacy
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/Shopify/sarama"
+	"github.com/influxdata/telegraf/testutil"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	"github.com/influxdata/telegraf/plugins/parsers"
+)
+
+func TestReadsMetricsFromKafka(t *testing.T) {
+	if testing.Short() {
+		t.Skip("Skipping integration test in short mode")
+	}
+
+	brokerPeers := []string{testutil.GetLocalHost() + ":9092"}
+	zkPeers := []string{testutil.GetLocalHost() + ":2181"}
+	testTopic := fmt.Sprintf("telegraf_test_topic_legacy_%d", time.Now().Unix())
+
+	// Send a Kafka message to the kafka host
+	msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257\n"
+	producer, err := sarama.NewSyncProducer(brokerPeers, nil)
+	require.NoError(t, err)
+	_, _, err = producer.SendMessage(
+		&sarama.ProducerMessage{
+			Topic: testTopic,
+			Value: sarama.StringEncoder(msg),
+		})
+	require.NoError(t, err)
+	defer producer.Close()
+
+	// Start the Kafka Consumer
+	k := &Kafka{
+		ConsumerGroup:  "telegraf_test_consumers",
+		Topics:         []string{testTopic},
+		ZookeeperPeers: zkPeers,
+		PointBuffer:    100000,
+		Offset:         "oldest",
+	}
+	p, _ := parsers.NewInfluxParser()
+	k.SetParser(p)
+
+	// Verify that we can now gather the sent message
+	var acc testutil.Accumulator
+
+	// Sanity check
+	assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
+	if err := k.Start(&acc); err != nil {
+		t.Fatal(err.Error())
+	} else {
+		defer k.Stop()
+	}
+
+	waitForPoint(&acc, t)
+
+	// Gather points
+	err = acc.GatherError(k.Gather)
+	require.NoError(t, err)
+	if len(acc.Metrics) == 1 {
+		point := acc.Metrics[0]
+		assert.Equal(t, "cpu_load_short", point.Measurement)
+		assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
+		assert.Equal(t, map[string]string{
+			"host":      "server01",
+			"direction": "in",
+			"region":    "us-west",
+		}, point.Tags)
+		assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
+	} else {
+		t.Errorf("No points found in accumulator, expected 1")
+	}
+}
+
+// Waits for the metric that was sent to the kafka broker to arrive at the kafka
+// consumer
+func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
+	// Give the kafka container up to 2 seconds to get the point to the consumer
+	ticker := time.NewTicker(5 * time.Millisecond)
+	counter := 0
+	for {
+		select {
+		case <-ticker.C:
+			counter++
+			if counter > 1000 {
+				t.Fatal("Waited for 5s, point never arrived to consumer")
+			} else if acc.NFields() == 1 {
+				return
+			}
+		}
+	}
+}
diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..630aca163a12efcac975df172fd0923a87cd3e6c
--- /dev/null
+++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go
@@ -0,0 +1,150 @@
+package kafka_consumer_legacy
+
+import (
+	"strings"
+	"testing"
+
+	"github.com/influxdata/telegraf/plugins/parsers"
+	"github.com/influxdata/telegraf/testutil"
+
+	"github.com/Shopify/sarama"
+	"github.com/stretchr/testify/assert"
+)
+
+const (
+	testMsg         = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
+	testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
+	testMsgJSON     = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
+	invalidMsg      = "cpu_load_short,host=server01 1422568543702900257\n"
+)
+
+func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
+	in := make(chan *sarama.ConsumerMessage, 1000)
+	k := Kafka{
+		ConsumerGroup:   "test",
+		Topics:          []string{"telegraf"},
+		ZookeeperPeers:  []string{"localhost:2181"},
+		Offset:          "oldest",
+		in:              in,
+		doNotCommitMsgs: true,
+		errs:            make(chan error, 1000),
+		done:            make(chan struct{}),
+	}
+	return &k, in
+}
+
+// Test that the parser parses kafka messages into points
+func TestRunParser(t *testing.T) {
+	k, in := newTestKafka()
+	acc := testutil.Accumulator{}
+	k.acc = &acc
+	defer close(k.done)
+
+	k.parser, _ = parsers.NewInfluxParser()
+	go k.receiver()
+	in <- saramaMsg(testMsg)
+	acc.Wait(1)
+
+	assert.Equal(t, acc.NFields(), 1)
+}
+
+// Test that the parser ignores invalid messages
+func TestRunParserInvalidMsg(t *testing.T) {
+	k, in := newTestKafka()
+	acc := testutil.Accumulator{}
+	k.acc = &acc
+	defer close(k.done)
+
+	k.parser, _ = parsers.NewInfluxParser()
+	go k.receiver()
+	in <- saramaMsg(invalidMsg)
+	acc.WaitError(1)
+
+	assert.Equal(t, acc.NFields(), 0)
+}
+
+// Test that overlong messages are dropped
+func TestDropOverlongMsg(t *testing.T) {
+	const maxMessageLen = 64 * 1024
+	k, in := newTestKafka()
+	k.MaxMessageLen = maxMessageLen
+	acc := testutil.Accumulator{}
+	k.acc = &acc
+	defer close(k.done)
+	overlongMsg := strings.Repeat("v", maxMessageLen+1)
+
+	go k.receiver()
+	in <- saramaMsg(overlongMsg)
+	acc.WaitError(1)
+
+	assert.Equal(t, acc.NFields(), 0)
+}
+
+// Test that the parser parses kafka messages into points
+func TestRunParserAndGather(t *testing.T) {
+	k, in := newTestKafka()
+	acc := testutil.Accumulator{}
+	k.acc = &acc
+	defer close(k.done)
+
+	k.parser, _ = parsers.NewInfluxParser()
+	go k.receiver()
+	in <- saramaMsg(testMsg)
+	acc.Wait(1)
+
+	acc.GatherError(k.Gather)
+
+	assert.Equal(t, acc.NFields(), 1)
+	acc.AssertContainsFields(t, "cpu_load_short",
+		map[string]interface{}{"value": float64(23422)})
+}
+
+// Test that the parser parses kafka messages into points
+func TestRunParserAndGatherGraphite(t *testing.T) {
+	k, in := newTestKafka()
+	acc := testutil.Accumulator{}
+	k.acc = &acc
+	defer close(k.done)
+
+	k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
+	go k.receiver()
+	in <- saramaMsg(testMsgGraphite)
+	acc.Wait(1)
+
+	acc.GatherError(k.Gather)
+
+	assert.Equal(t, acc.NFields(), 1)
+	acc.AssertContainsFields(t, "cpu_load_short_graphite",
+		map[string]interface{}{"value": float64(23422)})
+}
+
+// Test that the parser parses kafka messages into points
+func TestRunParserAndGatherJSON(t *testing.T) {
+	k, in := newTestKafka()
+	acc := testutil.Accumulator{}
+	k.acc = &acc
+	defer close(k.done)
+
+	k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
+	go k.receiver()
+	in <- saramaMsg(testMsgJSON)
+	acc.Wait(1)
+
+	acc.GatherError(k.Gather)
+
+	assert.Equal(t, acc.NFields(), 2)
+	acc.AssertContainsFields(t, "kafka_json_test",
+		map[string]interface{}{
+			"a":   float64(5),
+			"b_c": float64(6),
+		})
+}
+
+func saramaMsg(val string) *sarama.ConsumerMessage {
+	return &sarama.ConsumerMessage{
+		Key:       nil,
+		Value:     []byte(val),
+		Offset:    0,
+		Partition: 0,
+	}
+}