diff --git a/CHANGELOG.md b/CHANGELOG.md
index 12381152c732f21f8ad544b67c190a32ad92457b..10934f7fd4ff02fc74108d475f45129ea8bea8e6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -66,6 +66,7 @@ be deprecated eventually.
 - [#2425](https://github.com/influxdata/telegraf/pull/2425): Support to include/exclude docker container labels as tags
 - [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
 - [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener
+- [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input
 
 ### Bugfixes
 
diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md
index afdb51e32a2745b14d8de6f1d1027f97a0dedf5b..6a95a7c54c833edffe70d9ffb1929c10df395722 100644
--- a/plugins/inputs/kafka_consumer/README.md
+++ b/plugins/inputs/kafka_consumer/README.md
@@ -28,6 +28,10 @@ from the same topic in parallel.
   ## more about them here:
   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
   data_format = "influx"
+
+  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
+  ## larger messages are dropped
+  max_message_len = 65536
 ```
 
 ## Testing
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go
index 6f1f4020b46337d2f5c6dd6e4d0477ead1d89e95..2f6933db0c0047009e4eca3a437795f96d36e82c 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer.go
@@ -17,6 +17,7 @@ import (
 type Kafka struct {
 	ConsumerGroup   string
 	Topics          []string
+	MaxMessageLen   int
 	ZookeeperPeers  []string
 	ZookeeperChroot string
 	Consumer        *consumergroup.ConsumerGroup
@@ -58,10 +59,14 @@ var sampleConfig = `
   offset = "oldest"
 
   ## Data format to consume.
-  ## Each data format has it's own unique set of configuration options, read
+  ## Each data format has its own unique set of configuration options, read
   ## more about them here:
   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
   data_format = "influx"
+
+  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
+  ## larger messages are dropped
+  max_message_len = 65536
 `
 
 func (k *Kafka) SampleConfig() string {
@@ -130,17 +135,21 @@ func (k *Kafka) receiver() {
 			return
 		case err := <-k.errs:
 			if err != nil {
-				k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err))
+				k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err))
 			}
 		case msg := <-k.in:
-			metrics, err := k.parser.Parse(msg.Value)
-			if err != nil {
-				k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
-					string(msg.Value), err.Error()))
-			}
-
-			for _, metric := range metrics {
-				k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
+			if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
+				k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)",
+					len(msg.Value), k.MaxMessageLen))
+			} else {
+				metrics, err := k.parser.Parse(msg.Value)
+				if err != nil {
+					k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s",
+						string(msg.Value), err.Error()))
+				}
+				for _, metric := range metrics {
+					k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
+				}
 			}
 
 			if !k.doNotCommitMsgs {
@@ -159,7 +168,7 @@ func (k *Kafka) Stop() {
 	defer k.Unlock()
 	close(k.done)
 	if err := k.Consumer.Close(); err != nil {
-		k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error()))
+		k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
 	}
 }
 
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
index e1c24adbed1b6a2c0c0a43ceb0a7f314019a7689..04498261ca69f145352673c894955e69b908c63b 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
@@ -1,6 +1,7 @@
 package kafka_consumer
 
 import (
+	"strings"
 	"testing"
 
 	"github.com/influxdata/telegraf/plugins/parsers"
@@ -62,6 +63,23 @@ func TestRunParserInvalidMsg(t *testing.T) {
 	assert.Equal(t, acc.NFields(), 0)
 }
 
+// Test that overlong messages are dropped
+func TestDropOverlongMsg(t *testing.T) {
+	const maxMessageLen = 64 * 1024
+	k, in := newTestKafka()
+	k.MaxMessageLen = maxMessageLen
+	acc := testutil.Accumulator{}
+	k.acc = &acc
+	defer close(k.done)
+	overlongMsg := strings.Repeat("v", maxMessageLen+1)
+
+	go k.receiver()
+	in <- saramaMsg(overlongMsg)
+	acc.WaitError(1)
+
+	assert.Equal(t, acc.NFields(), 0)
+}
+
 // Test that the parser parses kafka messages into points
 func TestRunParserAndGather(t *testing.T) {
 	k, in := newTestKafka()