From da6ad34fc8376d04f3a4a9e28fcec2eaef8ba913 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel.nelson@influxdb.com>
Date: Sun, 3 Jun 2018 18:12:48 -0700
Subject: [PATCH] Add option to disconnect after a message limit is reached in
 amqp output

---
 plugins/inputs/amqp_consumer/README.md        |   2 +-
 plugins/inputs/amqp_consumer/amqp_consumer.go |   3 +-
 plugins/outputs/amqp/README.md                |  90 +--
 plugins/outputs/amqp/amqp.go                  | 513 +++++++++---------
 plugins/outputs/amqp/amqp_test.go             | 167 +++++-
 plugins/outputs/amqp/client.go                | 134 +++++
 6 files changed, 583 insertions(+), 326 deletions(-)
 create mode 100644 plugins/outputs/amqp/client.go

diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md
index 84661aec..e7701ad7 100644
--- a/plugins/inputs/amqp_consumer/README.md
+++ b/plugins/inputs/amqp_consumer/README.md
@@ -50,7 +50,7 @@ The following defaults are known to work with RabbitMQ:
   binding_key = "#"
 
   ## Maximum number of messages server should give to the worker.
-  prefetch_count = 50
+  # prefetch_count = 50
 
   ## Auth method. PLAIN and EXTERNAL are supported
   ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go
index d39c995c..1dde6fe2 100644
--- a/plugins/inputs/amqp_consumer/amqp_consumer.go
+++ b/plugins/inputs/amqp_consumer/amqp_consumer.go
@@ -100,11 +100,12 @@ func (a *AMQPConsumer) SampleConfig() string {
 
   ## AMQP queue name
   queue = "telegraf"
+
   ## Binding Key
   binding_key = "#"
 
   ## Maximum number of messages server should give to the worker.
-  prefetch_count = 50
+  # prefetch_count = 50
 
   ## Auth method. PLAIN and EXTERNAL are supported
   ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md
index ebc55514..817eea67 100644
--- a/plugins/outputs/amqp/README.md
+++ b/plugins/outputs/amqp/README.md
@@ -2,27 +2,15 @@
 
 This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
 
-Metrics are written to a topic exchange using a routing key defined by:
-1. The routing_key config defines a static value
-2. The routing_tag config defines a metric tag with a dynamic value, overriding the static routing_key if found
-3. If neither option is defined, or the tag is not found in a metric, then the empty routing key will be used
-
-Metrics are grouped in batches by the final routing key.
-
-This plugin doesn't bind exchange to a queue, so it should be done by consumer. The exchange is always defined as type: topic.
-To use it for distributing metrics equally among workers (type: direct), set the routing_key to a static value on the exchange,
-declare and bind a single queue with the same routing_key, and consume from the same queue in each worker.
-To use it to send metrics to many consumers at once (type: fanout), set the routing_key to "#" on the exchange, then declare, bind,
-and consume from individual queues in each worker.
+This plugin does not bind the exchange to a queue.
 
 For an introduction to AMQP see:
 - https://www.rabbitmq.com/tutorials/amqp-concepts.html
 - https://www.rabbitmq.com/getstarted.html
 
 ### Configuration:
-
 ```toml
-# Configuration for the AMQP server to send metrics to
+# Publishes metrics to an AMQP broker
 [[outputs.amqp]]
   ## Broker to publish to.
   ##   deprecated in 1.7; use the brokers option
@@ -33,9 +21,10 @@ For an introduction to AMQP see:
   ## helpful for load balancing when not using a dedicated load balancer.
   brokers = ["amqp://localhost:5672/influxdb"]
 
-  ## Authentication credentials for the PLAIN auth_method.
-  # username = ""
-  # password = ""
+  ## Maximum messages to send over a connection.  Once this is reached, the
+  ## connection is closed and a new connection is made.  This can be helpful for
+  ## load balancing when not using a dedicated load balancer.
+  # max_messages = 0
 
   ## Exchange to declare and publish to.
   exchange = "telegraf"
@@ -44,36 +33,51 @@ For an introduction to AMQP see:
   # exchange_type = "topic"
 
   ## If true, exchange will be passively declared.
-  # exchange_passive = false
+  # exchange_declare_passive = false
 
-  ## Exchange durability can be either "transient" or "durable".
-  # exchange_durability = "durable"
+  ## If true, exchange will be created as a durable exchange.
+  # exchange_durable = true
 
   ## Additional exchange arguments.
   # exchange_args = { }
   # exchange_args = {"hash_propery" = "timestamp"}
 
+  ## Authentication credentials for the PLAIN auth_method.
+  # username = ""
+  # password = ""
+
   ## Auth method. PLAIN and EXTERNAL are supported
   ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
   ## described here: https://www.rabbitmq.com/plugins.html
   # auth_method = "PLAIN"
-  ## Topic routing key
+
+  ## Metric tag to use as a routing key.
+  ##   ie, if this tag exists, its value will be used as the routing key
+  # routing_tag = "host"
+
+  ## Static routing key.  Used when no routing_tag is set or as a fallback
+  ## when the tag specified in routing tag is not found.
   # routing_key = ""
-  ## Telegraf tag to use as a routing key
-  ##  ie, if this tag exists, its value will be used as the routing key
-  ##  and override routing_key config even if defined
-  routing_tag = "host"
-  ## Delivery Mode controls if a published message is persistent
-  ## Valid options are "transient" and "persistent". default: "transient"
-  delivery_mode = "transient"
-
-  ## InfluxDB retention policy
-  # retention_policy = "default"
-  ## InfluxDB database
+  # routing_key = "telegraf"
+
+  ## Delivery Mode controls if a published message is persistent.
+  ##   One of "transient" or "persistent".
+  # delivery_mode = "transient"
+
+  ## InfluxDB database added as a message header.
+  ##   deprecated in 1.7; use the headers option
   # database = "telegraf"
 
-  ## Write timeout, formatted as a string.  If not provided, will default
-  ## to 5s. 0s means no timeout (not recommended).
+  ## InfluxDB retention policy added as a message header
+  ##   deprecated in 1.7; use the headers option
+  # retention_policy = "default"
+
+  ## Static headers added to each published message.
+  # headers = { }
+  # headers = {"database" = "telegraf", "retention_policy" = "default"}
+
+  ## Connection timeout.  If not provided, will default to 5s.  0s means no
+  ## timeout (not recommended).
   # timeout = "5s"
 
   ## Optional TLS Config
@@ -83,9 +87,25 @@ For an introduction to AMQP see:
   ## Use TLS but skip chain & host verification
   # insecure_skip_verify = false
 
+  ## If true use batch serialization format instead of line based delimiting.
+  ## Only applies to data formats which are not line based such as JSON.
+  ## Recommended to set to true.
+  # use_batch_format = false
+
   ## Data format to output.
   ## Each data format has its own unique set of configuration options, read
   ## more about them here:
   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
-  data_format = "influx"
+  # data_format = "influx"
 ```
+
+#### Routing
+
+If `routing_tag` is set, and the tag is defined on the metric, the value of
+the tag is used as the routing key.  Otherwise the value of `routing_key` is
+used directly.  If both are unset the empty string is used.
+
+Exchange types that do not use a routing key, `direct` and `header`, always
+use the empty string as the routing key.
+
+Metrics are published in batches based on the final routing key.
diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go
index dfed5713..faaa1027 100644
--- a/plugins/outputs/amqp/amqp.go
+++ b/plugins/outputs/amqp/amqp.go
@@ -1,13 +1,10 @@
 package amqp
 
 import (
-	"errors"
+	"bytes"
 	"fmt"
 	"log"
-	"math/rand"
-	"net"
 	"strings"
-	"sync"
 	"time"
 
 	"github.com/influxdata/telegraf"
@@ -19,70 +16,58 @@ import (
 	"github.com/streadway/amqp"
 )
 
-type client struct {
-	conn    *amqp.Connection
-	channel *amqp.Channel
-	headers amqp.Table
+const (
+	DefaultURL             = "amqp://localhost:5672/influxdb"
+	DefaultAuthMethod      = "PLAIN"
+	DefaultExchangeType    = "topic"
+	DefaultRetentionPolicy = "default"
+	DefaultDatabase        = "telegraf"
+)
+
+type externalAuth struct{}
+
+func (a *externalAuth) Mechanism() string {
+	return "EXTERNAL"
+}
+
+func (a *externalAuth) Response() string {
+	return fmt.Sprintf("\000")
 }
 
 type AMQP struct {
 	URL                string            `toml:"url"` // deprecated in 1.7; use brokers
 	Brokers            []string          `toml:"brokers"`
-	Username           string            `toml:"username"`
-	Password           string            `toml:"password"`
 	Exchange           string            `toml:"exchange"`
 	ExchangeType       string            `toml:"exchange_type"`
-	ExchangeDurability string            `toml:"exchange_durability"`
 	ExchangePassive    bool              `toml:"exchange_passive"`
+	ExchangeDurability string            `toml:"exchange_durability"`
 	ExchangeArguments  map[string]string `toml:"exchange_arguments"`
-
-	// AMQP Auth method
-	AuthMethod string
-	// Routing Key (static)
-	RoutingKey string `toml:"routing_key"`
-	// Routing Key from Tag
-	RoutingTag string `toml:"routing_tag"`
-	// InfluxDB database
-	Database string
-	// InfluxDB retention policy
-	RetentionPolicy string
-	// InfluxDB precision (DEPRECATED)
-	Precision string
-	// Connection timeout
-	Timeout internal.Duration
-	// Delivery Mode controls if a published message is persistent
-	// Valid options are "transient" and "persistent". default: "transient"
-	DeliveryMode string
-
+	Username           string            `toml:"username"`
+	Password           string            `toml:"password"`
+	MaxMessages        int               `toml:"max_messages"`
+	AuthMethod         string            `toml:"auth_method"`
+	RoutingTag         string            `toml:"routing_tag"`
+	RoutingKey         string            `toml:"routing_key"`
+	DeliveryMode       string            `toml:"delivery_mode"`
+	Database           string            `toml:"database"`         // deprecated in 1.7; use headers
+	RetentionPolicy    string            `toml:"retention_policy"` // deprecated in 1.7; use headers
+	Precision          string            `toml:"precision"`        // deprecated; has no effect
+	Headers            map[string]string `toml:"headers"`
+	Timeout            internal.Duration `toml:"timeout"`
+	UseBatchFormat     bool              `toml:"use_batch_format"`
 	tls.ClientConfig
 
-	sync.Mutex
-	c *client
-
-	deliveryMode uint8
 	serializer   serializers.Serializer
+	connect      func(*ClientConfig) (Client, error)
+	client       Client
+	config       *ClientConfig
+	sentMessages int
 }
 
-type externalAuth struct{}
-
-func (a *externalAuth) Mechanism() string {
-	return "EXTERNAL"
+type Client interface {
+	Publish(key string, body []byte) error
+	Close() error
 }
-func (a *externalAuth) Response() string {
-	return fmt.Sprintf("\000")
-}
-
-const (
-	DefaultAuthMethod = "PLAIN"
-
-	DefaultBroker = "amqp://localhost:5672/influxdb"
-
-	DefaultExchangeType       = "topic"
-	DefaultExchangeDurability = "durable"
-
-	DefaultRetentionPolicy = "default"
-	DefaultDatabase        = "telegraf"
-)
 
 var sampleConfig = `
   ## Broker to publish to.
@@ -94,9 +79,10 @@ var sampleConfig = `
   ## helpful for load balancing when not using a dedicated load balancer.
   brokers = ["amqp://localhost:5672/influxdb"]
 
-  ## Authentication credentials for the PLAIN auth_method.
-  # username = ""
-  # password = ""
+  ## Maximum messages to send over a connection.  Once this is reached, the
+  ## connection is closed and a new connection is made.  This can be helpful for
+  ## load balancing when not using a dedicated load balancer.
+  # max_messages = 0
 
   ## Exchange to declare and publish to.
   exchange = "telegraf"
@@ -105,36 +91,51 @@ var sampleConfig = `
   # exchange_type = "topic"
 
   ## If true, exchange will be passively declared.
-  # exchange_passive = false
+  # exchange_declare_passive = false
 
-  ## Exchange durability can be either "transient" or "durable".
-  # exchange_durability = "durable"
+  ## If true, exchange will be created as a durable exchange.
+  # exchange_durable = true
 
   ## Additional exchange arguments.
   # exchange_args = { }
   # exchange_args = {"hash_propery" = "timestamp"}
 
+  ## Authentication credentials for the PLAIN auth_method.
+  # username = ""
+  # password = ""
+
   ## Auth method. PLAIN and EXTERNAL are supported
   ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
   ## described here: https://www.rabbitmq.com/plugins.html
   # auth_method = "PLAIN"
-  ## Topic routing key
+
+  ## Metric tag to use as a routing key.
+  ##   ie, if this tag exists, its value will be used as the routing key
+  # routing_tag = "host"
+
+  ## Static routing key.  Used when no routing_tag is set or as a fallback
+  ## when the tag specified in routing tag is not found.
   # routing_key = ""
-  ## Telegraf tag to use as a routing key
-  ##  ie, if this tag exists, its value will be used as the routing key
-  ##  and override routing_key config even if defined
-  routing_tag = "host"
-  ## Delivery Mode controls if a published message is persistent
-  ## Valid options are "transient" and "persistent". default: "transient"
-  delivery_mode = "transient"
-
-  ## InfluxDB retention policy
-  # retention_policy = "default"
-  ## InfluxDB database
+  # routing_key = "telegraf"
+
+  ## Delivery Mode controls if a published message is persistent.
+  ##   One of "transient" or "persistent".
+  # delivery_mode = "transient"
+
+  ## InfluxDB database added as a message header.
+  ##   deprecated in 1.7; use the headers option
   # database = "telegraf"
 
-  ## Write timeout, formatted as a string.  If not provided, will default
-  ## to 5s. 0s means no timeout (not recommended).
+  ## InfluxDB retention policy added as a message header
+  ##   deprecated in 1.7; use the headers option
+  # retention_policy = "default"
+
+  ## Static headers added to each published message.
+  # headers = { }
+  # headers = {"database" = "telegraf", "retention_policy" = "default"}
+
+  ## Connection timeout.  If not provided, will default to 5s.  0s means no
+  ## timeout (not recommended).
   # timeout = "5s"
 
   ## Optional TLS Config
@@ -144,269 +145,239 @@ var sampleConfig = `
   ## Use TLS but skip chain & host verification
   # insecure_skip_verify = false
 
+  ## If true use batch serialization format instead of line based delimiting.
+  ## Only applies to data formats which are not line based such as JSON.
+  ## Recommended to set to true.
+  # use_batch_format = false
+
   ## Data format to output.
   ## Each data format has its own unique set of configuration options, read
   ## more about them here:
   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
-  data_format = "influx"
+  # data_format = "influx"
 `
 
-func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
-	a.serializer = serializer
+func (q *AMQP) SampleConfig() string {
+	return sampleConfig
 }
 
-func (q *AMQP) Connect() error {
-	switch q.DeliveryMode {
-	case "transient":
-		q.deliveryMode = amqp.Transient
-		break
-	case "persistent":
-		q.deliveryMode = amqp.Persistent
-		break
-	default:
-		q.deliveryMode = amqp.Transient
-		break
-	}
+func (q *AMQP) Description() string {
+	return "Publishes metrics to an AMQP broker"
+}
+
+func (q *AMQP) SetSerializer(serializer serializers.Serializer) {
+	q.serializer = serializer
+}
 
-	headers := amqp.Table{
-		"database":         q.Database,
-		"retention_policy": q.RetentionPolicy,
+func (q *AMQP) Connect() error {
+	if q.config == nil {
+		config, err := q.makeClientConfig()
+		if err != nil {
+			return err
+		}
+		q.config = config
 	}
 
-	// make new tls config
-	tls, err := q.ClientConfig.TLSConfig()
+	client, err := q.connect(q.config)
 	if err != nil {
 		return err
 	}
+	q.client = client
 
-	var auth []amqp.Authentication
-	if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
-		auth = []amqp.Authentication{&externalAuth{}}
-	} else if q.Username != "" || q.Password != "" {
-		auth = []amqp.Authentication{
-			&amqp.PlainAuth{
-				Username: q.Username,
-				Password: q.Password,
-			},
-		}
-	}
+	return nil
+}
 
-	brokers := q.Brokers
-	if len(brokers) == 0 {
-		brokers = []string{q.URL}
+func (q *AMQP) Close() error {
+	if q.client != nil {
+		return q.client.Close()
 	}
+	return nil
+}
 
-	amqpConf := amqp.Config{
-		TLSClientConfig: tls,
-		SASL:            auth, // if nil, it will be PLAIN
-		Dial: func(network, addr string) (net.Conn, error) {
-			return net.DialTimeout(network, addr, q.Timeout.Duration)
-		},
+func (q *AMQP) routingKey(metric telegraf.Metric) string {
+	if q.RoutingTag != "" {
+		key, ok := metric.GetTag(q.RoutingTag)
+		if ok {
+			return key
+		}
 	}
+	return q.RoutingKey
+}
+
+func (q *AMQP) Write(metrics []telegraf.Metric) error {
+	batches := make(map[string][]telegraf.Metric)
+	if q.ExchangeType == "direct" || q.ExchangeType == "header" {
+		// Since the routing_key is ignored for these exchange types send as a
+		// single batch.
+		batches[""] = metrics
+	} else {
+		for _, metric := range metrics {
+			routingKey := q.routingKey(metric)
+			if _, ok := batches[routingKey]; !ok {
+				batches[routingKey] = make([]telegraf.Metric, 0)
+			}
 
-	var connection *amqp.Connection
-	p := rand.Perm(len(brokers))
-	for _, n := range p {
-		broker := brokers[n]
-		log.Printf("D! Output [amqp] connecting to %q", broker)
-		conn, err := amqp.DialConfig(broker, amqpConf)
-		if err == nil {
-			connection = conn
-			log.Printf("D! Output [amqp] connected to %q", broker)
-			break
+			batches[routingKey] = append(batches[routingKey], metric)
 		}
-		log.Printf("D! Output [amqp] error connecting to %q", broker)
 	}
 
-	if connection == nil {
-		return errors.New("could not connect to any broker")
-	}
+	first := true
+	for key, metrics := range batches {
+		body, err := q.serialize(metrics)
+		if err != nil {
+			return err
+		}
 
-	channel, err := connection.Channel()
-	if err != nil {
-		return fmt.Errorf("Failed to open a channel: %s", err)
+		err = q.publish(key, body)
+		if err != nil {
+			// If this is the first attempt to publish and the connection is
+			// closed, try to reconnect and retry once.
+			if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed {
+				first = false
+				q.client = nil
+				err := q.publish(key, body)
+				if err != nil {
+					return err
+				}
+			} else {
+				q.client = nil
+				return err
+			}
+		}
+		first = false
 	}
 
-	var exchangeDurable = true
-	switch q.ExchangeDurability {
-	case "transient":
-		exchangeDurable = false
-	default:
-		exchangeDurable = true
+	if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 {
+		log.Printf("D! Output [amqp] sent MaxMessages; closing connection")
+		q.client = nil
 	}
 
-	exchangeArgs := make(amqp.Table, len(q.ExchangeArguments))
-	for k, v := range q.ExchangeArguments {
-		exchangeArgs[k] = v
+	return nil
+}
+
+func (q *AMQP) publish(key string, body []byte) error {
+	if q.client == nil {
+		client, err := q.connect(q.config)
+		if err != nil {
+			return err
+		}
+		q.sentMessages = 0
+		q.client = client
 	}
 
-	err = declareExchange(
-		channel,
-		q.Exchange,
-		q.ExchangeType,
-		q.ExchangePassive,
-		exchangeDurable,
-		exchangeArgs)
+	err := q.client.Publish(key, body)
 	if err != nil {
 		return err
 	}
-
-	q.setClient(&client{
-		conn:    connection,
-		channel: channel,
-		headers: headers,
-	})
-
-	go func() {
-		err := <-connection.NotifyClose(make(chan *amqp.Error))
-		if err == nil {
-			return
-		}
-
-		q.setClient(nil)
-
-		log.Printf("I! Closing: %s", err)
-		log.Printf("I! Trying to reconnect")
-		for err := q.Connect(); err != nil; err = q.Connect() {
-			log.Println("E! ", err.Error())
-			time.Sleep(10 * time.Second)
-		}
-	}()
+	q.sentMessages++
 	return nil
 }
 
-func declareExchange(
-	channel *amqp.Channel,
-	exchangeName string,
-	exchangeType string,
-	exchangePassive bool,
-	exchangeDurable bool,
-	exchangeArguments amqp.Table,
-) error {
-	var err error
-	if exchangePassive {
-		err = channel.ExchangeDeclarePassive(
-			exchangeName,
-			exchangeType,
-			exchangeDurable,
-			false, // delete when unused
-			false, // internal
-			false, // no-wait
-			exchangeArguments,
-		)
+func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) {
+	if q.UseBatchFormat {
+		return q.serializer.SerializeBatch(metrics)
 	} else {
-		err = channel.ExchangeDeclare(
-			exchangeName,
-			exchangeType,
-			exchangeDurable,
-			false, // delete when unused
-			false, // internal
-			false, // no-wait
-			exchangeArguments,
-		)
-	}
-	if err != nil {
-		return fmt.Errorf("error declaring exchange: %v", err)
+		var buf bytes.Buffer
+		for _, metric := range metrics {
+			octets, err := q.serializer.Serialize(metric)
+			if err != nil {
+				return nil, err
+			}
+			_, err = buf.Write(octets)
+			if err != nil {
+				return nil, err
+			}
+		}
+		body := buf.Bytes()
+		return body, nil
 	}
-	return nil
 }
 
-func (q *AMQP) Close() error {
-	c := q.getClient()
-	if c == nil {
-		return nil
+func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
+	config := &ClientConfig{
+		exchange:        q.Exchange,
+		exchangeType:    q.ExchangeType,
+		exchangePassive: q.ExchangePassive,
+		timeout:         q.Timeout.Duration,
 	}
 
-	err := c.conn.Close()
-	if err != nil && err != amqp.ErrClosed {
-		log.Printf("E! Error closing AMQP connection: %s", err)
-		return err
+	switch q.ExchangeDurability {
+	case "transient":
+		config.exchangeDurable = false
+	default:
+		config.exchangeDurable = true
 	}
-	return nil
-}
-
-func (q *AMQP) SampleConfig() string {
-	return sampleConfig
-}
-
-func (q *AMQP) Description() string {
-	return "Configuration for the AMQP server to send metrics to"
-}
 
-func (q *AMQP) Write(metrics []telegraf.Metric) error {
-	if len(metrics) == 0 {
-		return nil
+	config.brokers = q.Brokers
+	if len(config.brokers) == 0 {
+		config.brokers = []string{q.URL}
 	}
 
-	c := q.getClient()
-	if c == nil {
-		return fmt.Errorf("connection is not open")
+	switch q.DeliveryMode {
+	case "transient":
+		config.deliveryMode = amqp.Transient
+	case "persistent":
+		config.deliveryMode = amqp.Persistent
+	default:
+		config.deliveryMode = amqp.Transient
 	}
 
-	outbuf := make(map[string][]byte)
-
-	for _, metric := range metrics {
-		var key string
-		if q.RoutingKey != "" {
-			key = q.RoutingKey
+	if len(q.Headers) > 0 {
+		config.headers = make(amqp.Table, len(q.Headers))
+		for k, v := range q.Headers {
+			config.headers[k] = v
 		}
-		if q.RoutingTag != "" {
-			if h, ok := metric.Tags()[q.RoutingTag]; ok {
-				key = h
-			}
+	} else {
+		// Copy deprecated fields into message header
+		config.headers = amqp.Table{
+			"database":         q.Database,
+			"retention_policy": q.RetentionPolicy,
 		}
+	}
 
-		buf, err := q.serializer.Serialize(metric)
-		if err != nil {
-			return err
+	if len(q.ExchangeArguments) > 0 {
+		config.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments))
+		for k, v := range q.ExchangeArguments {
+			config.exchangeArguments[k] = v
 		}
+	}
 
-		outbuf[key] = append(outbuf[key], buf...)
+	tlsConfig, err := q.ClientConfig.TLSConfig()
+	if err != nil {
+		return nil, err
 	}
+	config.tlsConfig = tlsConfig
 
-	for key, buf := range outbuf {
-		// Note that since the channel is not in confirm mode, the absence of
-		// an error does not indicate successful delivery.
-		err := c.channel.Publish(
-			q.Exchange, // exchange
-			key,        // routing key
-			false,      // mandatory
-			false,      // immediate
-			amqp.Publishing{
-				Headers:      c.headers,
-				ContentType:  "text/plain",
-				Body:         buf,
-				DeliveryMode: q.deliveryMode,
-			})
-		if err != nil {
-			return fmt.Errorf("Failed to send AMQP message: %s", err)
+	var auth []amqp.Authentication
+	if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
+		auth = []amqp.Authentication{&externalAuth{}}
+	} else if q.Username != "" || q.Password != "" {
+		auth = []amqp.Authentication{
+			&amqp.PlainAuth{
+				Username: q.Username,
+				Password: q.Password,
+			},
 		}
 	}
-	return nil
-}
+	config.auth = auth
 
-func (q *AMQP) getClient() *client {
-	q.Lock()
-	defer q.Unlock()
-	return q.c
+	return config, nil
 }
 
-func (q *AMQP) setClient(c *client) {
-	q.Lock()
-	q.c = c
-	q.Unlock()
+func connect(config *ClientConfig) (Client, error) {
+	return Connect(config)
 }
 
 func init() {
 	outputs.Add("amqp", func() telegraf.Output {
 		return &AMQP{
-			URL:                DefaultBroker,
-			AuthMethod:         DefaultAuthMethod,
-			ExchangeType:       DefaultExchangeType,
-			ExchangeDurability: DefaultExchangeDurability,
-			Database:           DefaultDatabase,
-			RetentionPolicy:    DefaultRetentionPolicy,
-			Timeout:            internal.Duration{Duration: time.Second * 5},
+			URL:             DefaultURL,
+			ExchangeType:    DefaultExchangeType,
+			AuthMethod:      DefaultAuthMethod,
+			Database:        DefaultDatabase,
+			RetentionPolicy: DefaultRetentionPolicy,
+			Timeout:         internal.Duration{Duration: time.Second * 5},
+			connect:         connect,
 		}
 	})
 }
diff --git a/plugins/outputs/amqp/amqp_test.go b/plugins/outputs/amqp/amqp_test.go
index 66a08262..32a91452 100644
--- a/plugins/outputs/amqp/amqp_test.go
+++ b/plugins/outputs/amqp/amqp_test.go
@@ -2,30 +2,161 @@ package amqp
 
 import (
 	"testing"
+	"time"
 
-	"github.com/influxdata/telegraf/plugins/serializers"
-	"github.com/influxdata/telegraf/testutil"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/streadway/amqp"
 	"github.com/stretchr/testify/require"
 )
 
-func TestConnectAndWrite(t *testing.T) {
-	if testing.Short() {
-		t.Skip("Skipping integration test in short mode")
-	}
+type MockClient struct {
+	PublishF func(key string, body []byte) error
+	CloseF   func() error
+
+	PublishCallCount int
+	CloseCallCount   int
+
+	t *testing.T
+}
 
-	var url = "amqp://" + testutil.GetLocalHost() + ":5672/"
-	s, _ := serializers.NewInfluxSerializer()
-	q := &AMQP{
-		URL:        url,
-		Exchange:   "telegraf_test",
-		serializer: s,
+func (c *MockClient) Publish(key string, body []byte) error {
+	c.PublishCallCount++
+	return c.PublishF(key, body)
+}
+
+func (c *MockClient) Close() error {
+	c.CloseCallCount++
+	return c.CloseF()
+}
+
+func MockConnect(config *ClientConfig) (Client, error) {
+	return &MockClient{}, nil
+}
+
+func NewMockClient() Client {
+	return &MockClient{
+		PublishF: func(key string, body []byte) error {
+			return nil
+		},
+		CloseF: func() error {
+			return nil
+		},
 	}
+}
 
-	// Verify that we can connect to the AMQP broker
-	err := q.Connect()
-	require.NoError(t, err)
+func TestConnect(t *testing.T) {
+	tests := []struct {
+		name    string
+		output  *AMQP
+		errFunc func(t *testing.T, output *AMQP, err error)
+	}{
+		{
+			name: "defaults",
+			output: &AMQP{
+				Brokers:            []string{DefaultURL},
+				ExchangeType:       DefaultExchangeType,
+				ExchangeDurability: "durable",
+				AuthMethod:         DefaultAuthMethod,
+				Database:           DefaultDatabase,
+				RetentionPolicy:    DefaultRetentionPolicy,
+				Timeout:            internal.Duration{Duration: time.Second * 5},
+				connect: func(config *ClientConfig) (Client, error) {
+					return NewMockClient(), nil
+				},
+			},
+			errFunc: func(t *testing.T, output *AMQP, err error) {
+				config := output.config
+				require.Equal(t, []string{DefaultURL}, config.brokers)
+				require.Equal(t, "", config.exchange)
+				require.Equal(t, "topic", config.exchangeType)
+				require.Equal(t, false, config.exchangePassive)
+				require.Equal(t, true, config.exchangeDurable)
+				require.Equal(t, amqp.Table(nil), config.exchangeArguments)
+				require.Equal(t, amqp.Table{
+					"database":         DefaultDatabase,
+					"retention_policy": DefaultRetentionPolicy,
+				}, config.headers)
+				require.Equal(t, amqp.Transient, config.deliveryMode)
+				require.NoError(t, err)
+			},
+		},
+		{
+			name: "headers overrides deprecated dbrp",
+			output: &AMQP{
+				Headers: map[string]string{
+					"foo": "bar",
+				},
+				connect: func(config *ClientConfig) (Client, error) {
+					return NewMockClient(), nil
+				},
+			},
+			errFunc: func(t *testing.T, output *AMQP, err error) {
+				config := output.config
+				require.Equal(t, amqp.Table{
+					"foo": "bar",
+				}, config.headers)
+				require.NoError(t, err)
+			},
+		},
+		{
+			name: "exchange args",
+			output: &AMQP{
+				ExchangeArguments: map[string]string{
+					"foo": "bar",
+				},
+				connect: func(config *ClientConfig) (Client, error) {
+					return NewMockClient(), nil
+				},
+			},
+			errFunc: func(t *testing.T, output *AMQP, err error) {
+				config := output.config
+				require.Equal(t, amqp.Table{
+					"foo": "bar",
+				}, config.exchangeArguments)
+				require.NoError(t, err)
+			},
+		},
+		{
+			name: "username password",
+			output: &AMQP{
+				URL:      "amqp://foo:bar@localhost",
+				Username: "telegraf",
+				Password: "pa$$word",
+				connect: func(config *ClientConfig) (Client, error) {
+					return NewMockClient(), nil
+				},
+			},
+			errFunc: func(t *testing.T, output *AMQP, err error) {
+				config := output.config
+				require.Equal(t, []amqp.Authentication{
+					&amqp.PlainAuth{
+						Username: "telegraf",
+						Password: "pa$$word",
+					},
+				}, config.auth)
 
-	// Verify that we can successfully write data to the amqp broker
-	err = q.Write(testutil.MockMetrics())
-	require.NoError(t, err)
+				require.NoError(t, err)
+			},
+		},
+		{
+			name: "url support",
+			output: &AMQP{
+				URL: DefaultURL,
+				connect: func(config *ClientConfig) (Client, error) {
+					return NewMockClient(), nil
+				},
+			},
+			errFunc: func(t *testing.T, output *AMQP, err error) {
+				config := output.config
+				require.Equal(t, []string{DefaultURL}, config.brokers)
+				require.NoError(t, err)
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			err := tt.output.Connect()
+			tt.errFunc(t, tt.output, err)
+		})
+	}
 }
diff --git a/plugins/outputs/amqp/client.go b/plugins/outputs/amqp/client.go
new file mode 100644
index 00000000..ba4e4516
--- /dev/null
+++ b/plugins/outputs/amqp/client.go
@@ -0,0 +1,134 @@
+package amqp
+
+import (
+	"crypto/tls"
+	"errors"
+	"fmt"
+	"log"
+	"math/rand"
+	"net"
+	"time"
+
+	"github.com/streadway/amqp"
+)
+
+type ClientConfig struct {
+	brokers           []string
+	exchange          string
+	exchangeType      string
+	exchangePassive   bool
+	exchangeDurable   bool
+	exchangeArguments amqp.Table
+	headers           amqp.Table
+	deliveryMode      uint8
+	tlsConfig         *tls.Config
+	timeout           time.Duration
+	auth              []amqp.Authentication
+}
+
+type client struct {
+	conn    *amqp.Connection
+	channel *amqp.Channel
+	config  *ClientConfig
+}
+
+// Connect opens a connection to one of the brokers at random
+func Connect(config *ClientConfig) (*client, error) {
+	client := &client{
+		config: config,
+	}
+
+	p := rand.Perm(len(config.brokers))
+	for _, n := range p {
+		broker := config.brokers[n]
+		log.Printf("D! Output [amqp] connecting to %q", broker)
+		conn, err := amqp.DialConfig(
+			broker, amqp.Config{
+				TLSClientConfig: config.tlsConfig,
+				SASL:            config.auth, // if nil, it will be PLAIN taken from url
+				Dial: func(network, addr string) (net.Conn, error) {
+					return net.DialTimeout(network, addr, config.timeout)
+				},
+			})
+		if err == nil {
+			client.conn = conn
+			log.Printf("D! Output [amqp] connected to %q", broker)
+			break
+		}
+		log.Printf("D! Output [amqp] error connecting to %q", broker)
+	}
+
+	if client.conn == nil {
+		return nil, errors.New("could not connect to any broker")
+	}
+
+	channel, err := client.conn.Channel()
+	if err != nil {
+		return nil, fmt.Errorf("error opening channel: %v", err)
+	}
+	client.channel = channel
+
+	err = client.DeclareExchange()
+	if err != nil {
+		return nil, err
+	}
+
+	return client, nil
+}
+
+func (c *client) DeclareExchange() error {
+	var err error
+	if c.config.exchangePassive {
+		err = c.channel.ExchangeDeclarePassive(
+			c.config.exchange,
+			c.config.exchangeType,
+			c.config.exchangeDurable,
+			false, // delete when unused
+			false, // internal
+			false, // no-wait
+			c.config.exchangeArguments,
+		)
+	} else {
+		err = c.channel.ExchangeDeclare(
+			c.config.exchange,
+			c.config.exchangeType,
+			c.config.exchangeDurable,
+			false, // delete when unused
+			false, // internal
+			false, // no-wait
+			c.config.exchangeArguments,
+		)
+	}
+	if err != nil {
+		return fmt.Errorf("error declaring exchange: %v", err)
+	}
+	return nil
+}
+
+func (c *client) Publish(key string, body []byte) error {
+	// Note that since the channel is not in confirm mode, the absence of
+	// an error does not indicate successful delivery.
+	return c.channel.Publish(
+		c.config.exchange, // exchange
+		key,               // routing key
+		false,             // mandatory
+		false,             // immediate
+		amqp.Publishing{
+			Headers:      c.config.headers,
+			ContentType:  "text/plain",
+			Body:         body,
+			DeliveryMode: c.config.deliveryMode,
+		})
+}
+
+func (c *client) Close() error {
+	if c.conn == nil {
+		return nil
+	}
+
+	err := c.conn.Close()
+	if err != nil && err != amqp.ErrClosed {
+		return err
+	}
+	return nil
+}
-- 
GitLab