diff --git a/CHANGELOG.md b/CHANGELOG.md
index fe56317671f91fbc07b7efa3ba1b0b0f831e9e85..323b239155194af6cd4ded4aff78b12c96c5cfb4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -55,6 +55,7 @@ be deprecated eventually.
 - [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors.
 - [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs.
 - [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK.
+- [#1678](https://github.com/influxdata/telegraf/pull/1678): Add AMQP consumer input plugin
 
 ### Bugfixes
 
diff --git a/README.md b/README.md
index 3dd06e93abe53def28c66c8d3c23578326bd14bd..915c7b7612d9122fe8e07a332fe6391d514e2f33 100644
--- a/README.md
+++ b/README.md
@@ -97,9 +97,10 @@ configuration options.
 
 ## Input Plugins
 
-* [aws cloudwatch](./plugins/inputs/cloudwatch)
 * [aerospike](./plugins/inputs/aerospike)
+* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
 * [apache](./plugins/inputs/apache)
+* [aws cloudwatch](./plugins/inputs/cloudwatch)
 * [bcache](./plugins/inputs/bcache)
 * [cassandra](./plugins/inputs/cassandra)
 * [ceph](./plugins/inputs/ceph)
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index 924dffe3d3e8bde11c50c5deb97331d9c15e8c87..a9147c53ed153ab760bb72b52d4f510227d59a8b 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -2,6 +2,7 @@ package all
 
 import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
+	_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
 	_ "github.com/influxdata/telegraf/plugins/inputs/apache"
 	_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
 	_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..85780700fb46b855940de80aca494640f8db9b36
--- /dev/null
+++ b/plugins/inputs/amqp_consumer/README.md
@@ -0,0 +1,47 @@
+# AMQP Consumer Input Plugin
+
+This plugin provides a consumer for use with AMQP 0-9-1, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
+
+Metrics are read from a topic exchange using the configured queue and binding_key.
+
+Message payload should be formatted in one of the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
+
+For an introduction to AMQP see:
+- https://www.rabbitmq.com/tutorials/amqp-concepts.html
+- https://www.rabbitmq.com/getstarted.html
+
+The following defaults are known to work with RabbitMQ:
+
+```toml
+# AMQP consumer plugin
+[[inputs.amqp_consumer]]
+  ## AMQP url
+  url = "amqp://localhost:5672/influxdb"
+  ## AMQP exchange
+  exchange = "telegraf"
+  ## AMQP queue name
+  queue = "telegraf"
+  ## Binding Key
+  binding_key = "#"
+
+  ## Controls how many messages the server will try to keep on the network
+  ## for consumers before receiving delivery acks.
+  #prefetch_count = 50
+
+  ## Auth method. PLAIN and EXTERNAL are supported.
+  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
+  ## described here: https://www.rabbitmq.com/plugins.html
+  # auth_method = "PLAIN"
+  ## Optional SSL Config
+  # ssl_ca = "/etc/telegraf/ca.pem"
+  # ssl_cert = "/etc/telegraf/cert.pem"
+  # ssl_key = "/etc/telegraf/key.pem"
+  ## Use SSL but skip chain & host verification
+  # insecure_skip_verify = false
+
+  ## Data format to output.
+  ## Each data format has it's own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
+  data_format = "influx"
+```
diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go
new file mode 100644
index 0000000000000000000000000000000000000000..6f12244aa4ac580d7d5a8d976b8efd708244adbd
--- /dev/null
+++ b/plugins/inputs/amqp_consumer/amqp_consumer.go
@@ -0,0 +1,280 @@
+package amqp_consumer
+
+import (
+	"fmt"
+	"log"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/streadway/amqp"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/plugins/inputs"
+	"github.com/influxdata/telegraf/plugins/parsers"
+)
+
+// AMQPConsumer is the top level struct for this plugin
+type AMQPConsumer struct {
+	URL string
+	// AMQP exchange
+	Exchange string
+	// Queue Name
+	Queue string
+	// Binding Key
+	BindingKey string `toml:"binding_key"`
+
+	// Controls how many messages the server will try to keep on the network
+	// for consumers before receiving delivery acks.
+	PrefetchCount int
+
+	// AMQP Auth method
+	AuthMethod string
+	// Path to CA file
+	SSLCA string `toml:"ssl_ca"`
+	// Path to host cert file
+	SSLCert string `toml:"ssl_cert"`
+	// Path to cert key file
+	SSLKey string `toml:"ssl_key"`
+	// Use SSL but skip chain & host verification
+	InsecureSkipVerify bool
+
+	parser parsers.Parser
+	conn   *amqp.Connection
+	wg     *sync.WaitGroup
+}
+
+type externalAuth struct{}
+
+func (a *externalAuth) Mechanism() string {
+	return "EXTERNAL"
+}
+func (a *externalAuth) Response() string {
+	return fmt.Sprintf("\000")
+}
+
+const (
+	DefaultAuthMethod    = "PLAIN"
+	DefaultPrefetchCount = 50
+)
+
+func (a *AMQPConsumer) SampleConfig() string {
+	return `
+  ## AMQP url
+  url = "amqp://localhost:5672/influxdb"
+  ## AMQP exchange
+  exchange = "telegraf"
+  ## AMQP queue name
+  queue = "telegraf"
+  ## Binding Key
+  binding_key = "#"
+
+  ## Maximum number of messages server should give to the worker.
+  prefetch_count = 50
+
+  ## Auth method. PLAIN and EXTERNAL are supported
+  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
+  ## described here: https://www.rabbitmq.com/plugins.html
+  # auth_method = "PLAIN"
+
+  ## Optional SSL Config
+  # ssl_ca = "/etc/telegraf/ca.pem"
+  # ssl_cert = "/etc/telegraf/cert.pem"
+  # ssl_key = "/etc/telegraf/key.pem"
+  ## Use SSL but skip chain & host verification
+  # insecure_skip_verify = false
+
+  ## Data format to output.
+  ## Each data format has it's own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
+  data_format = "influx"
+`
+}
+
+func (a *AMQPConsumer) Description() string {
+	return "AMQP consumer plugin"
+}
+
+func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
+	a.parser = parser
+}
+
+// All gathering is done in the Start function
+func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
+	return nil
+}
+
+func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
+	// make new tls config
+	tls, err := internal.GetTLSConfig(
+		a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify)
+	if err != nil {
+		return nil, err
+	}
+
+	// parse auth method
+	var sasl []amqp.Authentication // nil by default
+
+	if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
+		sasl = []amqp.Authentication{&externalAuth{}}
+	}
+
+	config := amqp.Config{
+		TLSClientConfig: tls,
+		SASL:            sasl, // if nil, it will be PLAIN
+	}
+	return &config, nil
+}
+
+// Start satisfies the telegraf.ServiceInput interface
+func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
+	amqpConf, err := a.createConfig()
+	if err != nil {
+		return err
+	}
+
+	msgs, err := a.connect(amqpConf)
+	if err != nil {
+		return err
+	}
+
+	a.wg = &sync.WaitGroup{}
+	a.wg.Add(1)
+	go a.process(msgs, acc)
+
+	go func() {
+		err := <-a.conn.NotifyClose(make(chan *amqp.Error))
+		if err == nil {
+			return
+		}
+
+		log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
+		for {
+			msgs, err := a.connect(amqpConf)
+			if err != nil {
+				log.Printf("E! AMQP connection failed: %s", err)
+				time.Sleep(10 * time.Second)
+				continue
+			}
+
+			a.wg.Add(1)
+			go a.process(msgs, acc)
+			break
+		}
+	}()
+
+	return nil
+}
+
+func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
+	conn, err := amqp.DialConfig(a.URL, *amqpConf)
+	if err != nil {
+		return nil, err
+	}
+	a.conn = conn
+
+	ch, err := conn.Channel()
+	if err != nil {
+		return nil, fmt.Errorf("Failed to open a channel: %s", err)
+	}
+
+	err = ch.ExchangeDeclare(
+		a.Exchange, // name
+		"topic",    // type
+		true,       // durable
+		false,      // auto-deleted
+		false,      // internal
+		false,      // no-wait
+		nil,        // arguments
+	)
+	if err != nil {
+		return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
+	}
+
+	q, err := ch.QueueDeclare(
+		a.Queue, // queue
+		true,    // durable
+		false,   // delete when unused
+		false,   // exclusive
+		false,   // no-wait
+		nil,     // arguments
+	)
+	if err != nil {
+		return nil, fmt.Errorf("Failed to declare a queue: %s", err)
+	}
+
+	err = ch.QueueBind(
+		q.Name,       // queue
+		a.BindingKey, // binding-key
+		a.Exchange,   // exchange
+		false,
+		nil,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("Failed to bind a queue: %s", err)
+	}
+
+	err = ch.Qos(
+		a.PrefetchCount,
+		0,     // prefetch-size
+		false, // global
+	)
+	if err != nil {
+		return nil, fmt.Errorf("Failed to set QoS: %s", err)
+	}
+
+	msgs, err := ch.Consume(
+		q.Name, // queue
+		"",     // consumer
+		false,  // auto-ack
+		false,  // exclusive
+		false,  // no-local
+		false,  // no-wait
+		nil,    // arguments
+	)
+	if err != nil {
+		return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
+	}
+
+	log.Println("I! Started AMQP consumer")
+	return msgs, err
+}
+
+// Read messages from queue and add them to the Accumulator
+func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
+	defer a.wg.Done()
+	for d := range msgs {
+		metrics, err := a.parser.Parse(d.Body)
+		if err != nil {
+			log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
+		} else {
+			for _, m := range metrics {
+				acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
+			}
+		}
+
+		d.Ack(false)
+	}
+	log.Printf("I! AMQP consumer queue closed")
+}
+
+func (a *AMQPConsumer) Stop() {
+	err := a.conn.Close()
+	if err != nil && err != amqp.ErrClosed {
+		log.Printf("E! Error closing AMQP connection: %s", err)
+		return
+	}
+	a.wg.Wait()
+	log.Println("I! Stopped AMQP service")
+}
+
+func init() {
+	inputs.Add("amqp_consumer", func() telegraf.Input {
+		return &AMQPConsumer{
+			AuthMethod:    DefaultAuthMethod,
+			PrefetchCount: DefaultPrefetchCount,
+		}
+	})
+}
diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md
index d49c507b8e7f1064cb066711bc187b2f5f42984b..208ae934cb35bcd5945419c4837eb41aa9fd2411 100644
--- a/plugins/outputs/amqp/README.md
+++ b/plugins/outputs/amqp/README.md
@@ -1,13 +1,18 @@
 # AMQP Output Plugin
 
-This plugin writes to a AMQP exchange using tag, defined in configuration file
-as RoutingTag, as a routing key.
+This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
+
+Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.
 
 If RoutingTag is empty, then empty routing key will be used.
 Metrics are grouped in batches by RoutingTag.
 
 This plugin doesn't bind exchange to a queue, so it should be done by consumer.
 
+For an introduction to AMQP see:
+- https://www.rabbitmq.com/tutorials/amqp-concepts.html
+- https://www.rabbitmq.com/getstarted.html
+
 ### Configuration:
 
 ```
@@ -18,6 +23,8 @@ This plugin doesn't bind exchange to a queue, so it should be done by consumer.
   ## AMQP exchange
   exchange = "telegraf"
   ## Auth method. PLAIN and EXTERNAL are supported
+  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
+  ## described here: https://www.rabbitmq.com/plugins.html
   # auth_method = "PLAIN"
   ## Telegraf tag to use as a routing key
   ##  ie, if this tag exists, it's value will be used as the routing key
diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go
index d86cac5969b5db5db3749f5a989ec29b8136de1b..7b4c7d4c9eb8ca567fc0026e27685d8cbdaa4762 100644
--- a/plugins/outputs/amqp/amqp.go
+++ b/plugins/outputs/amqp/amqp.go
@@ -40,6 +40,7 @@ type AMQP struct {
 	// Use SSL but skip chain & host verification
 	InsecureSkipVerify bool
 
+	conn    *amqp.Connection
 	channel *amqp.Channel
 	sync.Mutex
 	headers amqp.Table
@@ -68,6 +69,8 @@ var sampleConfig = `
   ## AMQP exchange
   exchange = "telegraf"
   ## Auth method. PLAIN and EXTERNAL are supported
+  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
+  ## described here: https://www.rabbitmq.com/plugins.html
   # auth_method = "PLAIN"
   ## Telegraf tag to use as a routing key
   ##  ie, if this tag exists, it's value will be used as the routing key
@@ -129,6 +132,8 @@ func (q *AMQP) Connect() error {
 	if err != nil {
 		return err
 	}
+	q.conn = connection
+
 	channel, err := connection.Channel()
 	if err != nil {
 		return fmt.Errorf("Failed to open a channel: %s", err)
@@ -148,7 +153,11 @@ func (q *AMQP) Connect() error {
 	}
 	q.channel = channel
 	go func() {
-		log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
+		err := <-connection.NotifyClose(make(chan *amqp.Error))
+		if err == nil {
+			return
+		}
+		log.Printf("I! Closing: %s", err)
 		log.Printf("I! Trying to reconnect")
 		for err := q.Connect(); err != nil; err = q.Connect() {
 			log.Println("E! ", err.Error())
@@ -160,7 +169,12 @@ func (q *AMQP) Connect() error {
 }
 
 func (q *AMQP) Close() error {
-	return q.channel.Close()
+	err := q.conn.Close()
+	if err != nil && err != amqp.ErrClosed {
+		log.Printf("E! Error closing AMQP connection: %s", err)
+		return err
+	}
+	return nil
 }
 
 func (q *AMQP) SampleConfig() string {
@@ -207,7 +221,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
 				Body:        buf,
 			})
 		if err != nil {
-			return fmt.Errorf("FAILED to send amqp message: %s", err)
+			return fmt.Errorf("Failed to send AMQP message: %s", err)
 		}
 	}
 	return nil