From e3f1d289084edf167978a305814690cba9e71a37 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel.nelson@influxdb.com>
Date: Sun, 3 Jun 2018 16:31:11 -0700
Subject: [PATCH] Allow configuration of amqp exchange type, durability, and
 arguments

---
 plugins/inputs/amqp_consumer/README.md        |  27 ++--
 plugins/inputs/amqp_consumer/amqp_consumer.go | 119 +++++++++++++-----
 plugins/outputs/amqp/README.md                |  21 +++-
 plugins/outputs/amqp/amqp.go                  | 109 +++++++++++++---
 4 files changed, 217 insertions(+), 59 deletions(-)

diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md
index c10a4410..53d97d08 100644
--- a/plugins/inputs/amqp_consumer/README.md
+++ b/plugins/inputs/amqp_consumer/README.md
@@ -17,23 +17,36 @@ The following defaults are known to work with RabbitMQ:
 [[inputs.amqp_consumer]]
   ## AMQP url
   url = "amqp://localhost:5672/influxdb"
-  ## AMQP exchange
+
+  ## Exchange to declare and consume from.
   exchange = "telegraf"
-  ## Exchange passive mode
-  exchange_passive = false
+
+  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
+  # exchange_type = "topic"
+
+  ## If true, exchange will be passively declared.
+  # exchange_passive = false
+
+  ## Exchange durability can be either "transient" or "durable".
+  # exchange_durability = "durable"
+
+  ## Additional exchange arguments.
+  # exchange_args = { }
+  # exchange_args = {"hash_propery" = "timestamp"}
+
   ## AMQP queue name
   queue = "telegraf"
   ## Binding Key
   binding_key = "#"
 
-  ## Controls how many messages the server will try to keep on the network
-  ## for consumers before receiving delivery acks.
-  #prefetch_count = 50
+  ## Maximum number of messages server should give to the worker.
+  prefetch_count = 50
 
-  ## Auth method. PLAIN and EXTERNAL are supported.
+  ## Auth method. PLAIN and EXTERNAL are supported
   ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
   ## described here: https://www.rabbitmq.com/plugins.html
   # auth_method = "PLAIN"
+
   ## Optional TLS Config
   # tls_ca = "/etc/telegraf/ca.pem"
   # tls_cert = "/etc/telegraf/cert.pem"
diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go
index 6ed79f6c..ad0e3ce7 100644
--- a/plugins/inputs/amqp_consumer/amqp_consumer.go
+++ b/plugins/inputs/amqp_consumer/amqp_consumer.go
@@ -18,10 +18,13 @@ import (
 // AMQPConsumer is the top level struct for this plugin
 type AMQPConsumer struct {
 	URL string
-	// AMQP exchange
-	Exchange string
-	// Exchange passive mode
-	ExchangePassive bool
+
+	Exchange           string            `toml:"exchange"`
+	ExchangeType       string            `toml:"exchange_type"`
+	ExchangeDurability string            `toml:"exchange_durability"`
+	ExchangePassive    bool              `toml:"exchange_passive"`
+	ExchangeArguments  map[string]string `toml:"exchange_arguments"`
+
 	// Queue Name
 	Queue string
 	// Binding Key
@@ -50,7 +53,11 @@ func (a *externalAuth) Response() string {
 }
 
 const (
-	DefaultAuthMethod    = "PLAIN"
+	DefaultAuthMethod = "PLAIN"
+
+	DefaultExchangeType       = "topic"
+	DefaultExchangeDurability = "durable"
+
 	DefaultPrefetchCount = 50
 )
 
@@ -58,10 +65,23 @@ func (a *AMQPConsumer) SampleConfig() string {
 	return `
   ## AMQP url
   url = "amqp://localhost:5672/influxdb"
-  ## AMQP exchange
+
+  ## Exchange to declare and consume from.
   exchange = "telegraf"
-  ## Exchange passive mode
-  exchange_passive = false
+
+  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
+  # exchange_type = "topic"
+
+  ## If true, exchange will be passively declared.
+  # exchange_passive = false
+
+  ## Exchange durability can be either "transient" or "durable".
+  # exchange_durability = "durable"
+
+  ## Additional exchange arguments.
+  # exchange_args = { }
+  # exchange_args = {"hash_propery" = "timestamp"}
+
   ## AMQP queue name
   queue = "telegraf"
   ## Binding Key
@@ -178,29 +198,28 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
 		return nil, fmt.Errorf("Failed to open a channel: %s", err)
 	}
 
-	if a.ExchangePassive == true {
-		err = ch.ExchangeDeclarePassive(
-			a.Exchange, // name
-			"topic",    // type
-			true,       // durable
-			false,      // auto-deleted
-			false,      // internal
-			false,      // no-wait
-			nil,        // arguments
-		)
-	} else {
-		err = ch.ExchangeDeclare(
-			a.Exchange, // name
-			"topic",    // type
-			true,       // durable
-			false,      // auto-deleted
-			false,      // internal
-			false,      // no-wait
-			nil,        // arguments
-		)
+	var exchangeDurable = true
+	switch a.ExchangeDurability {
+	case "transient":
+		exchangeDurable = false
+	default:
+		exchangeDurable = true
 	}
+
+	exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
+	for k, v := range a.ExchangeArguments {
+		exchangeArgs[k] = v
+	}
+
+	err = declareExchange(
+		ch,
+		a.Exchange,
+		a.ExchangeType,
+		a.ExchangePassive,
+		exchangeDurable,
+		exchangeArgs)
 	if err != nil {
-		return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
+		return nil, err
 	}
 
 	q, err := ch.QueueDeclare(
@@ -252,6 +271,42 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
 	return msgs, err
 }
 
+func declareExchange(
+	channel *amqp.Channel,
+	exchangeName string,
+	exchangeType string,
+	exchangePassive bool,
+	exchangeDurable bool,
+	exchangeArguments amqp.Table,
+) error {
+	var err error
+	if exchangePassive {
+		err = channel.ExchangeDeclarePassive(
+			exchangeName,
+			exchangeType,
+			exchangeDurable,
+			false, // delete when unused
+			false, // internal
+			false, // no-wait
+			exchangeArguments,
+		)
+	} else {
+		err = channel.ExchangeDeclare(
+			exchangeName,
+			exchangeType,
+			exchangeDurable,
+			false, // delete when unused
+			false, // internal
+			false, // no-wait
+			exchangeArguments,
+		)
+	}
+	if err != nil {
+		return fmt.Errorf("error declaring exchange: %v", err)
+	}
+	return nil
+}
+
 // Read messages from queue and add them to the Accumulator
 func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
 	defer a.wg.Done()
@@ -283,8 +338,10 @@ func (a *AMQPConsumer) Stop() {
 func init() {
 	inputs.Add("amqp_consumer", func() telegraf.Input {
 		return &AMQPConsumer{
-			AuthMethod:    DefaultAuthMethod,
-			PrefetchCount: DefaultPrefetchCount,
+			AuthMethod:         DefaultAuthMethod,
+			ExchangeType:       DefaultExchangeType,
+			ExchangeDurability: DefaultExchangeDurability,
+			PrefetchCount:      DefaultPrefetchCount,
 		}
 	})
 }
diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md
index 52a4ccbd..bb66a030 100644
--- a/plugins/outputs/amqp/README.md
+++ b/plugins/outputs/amqp/README.md
@@ -21,13 +21,28 @@ For an introduction to AMQP see:
 
 ### Configuration:
 
-```
+```toml
 # Configuration for the AMQP server to send metrics to
 [[outputs.amqp]]
   ## AMQP url
   url = "amqp://localhost:5672/influxdb"
-  ## AMQP exchange
+
+  ## Exchange to declare and publish to.
   exchange = "telegraf"
+
+  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
+  # exchange_type = "topic"
+
+  ## If true, exchange will be passively declared.
+  # exchange_passive = false
+
+  ## Exchange durability can be either "transient" or "durable".
+  # exchange_durability = "durable"
+
+  ## Additional exchange arguments.
+  # exchange_args = { }
+  # exchange_args = {"hash_propery" = "timestamp"}
+
   ## Auth method. PLAIN and EXTERNAL are supported
   ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
   ## described here: https://www.rabbitmq.com/plugins.html
@@ -40,7 +55,7 @@ For an introduction to AMQP see:
   routing_tag = "host"
   ## Delivery Mode controls if a published message is persistent
   ## Valid options are "transient" and "persistent". default: "transient"
-  # delivery_mode = "transient"
+  delivery_mode = "transient"
 
   ## InfluxDB retention policy
   # retention_policy = "default"
diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go
index bd3068eb..8a3bcbd6 100644
--- a/plugins/outputs/amqp/amqp.go
+++ b/plugins/outputs/amqp/amqp.go
@@ -26,8 +26,13 @@ type client struct {
 type AMQP struct {
 	// AMQP brokers to send metrics to
 	URL string
-	// AMQP exchange
-	Exchange string
+
+	Exchange           string            `toml:"exchange"`
+	ExchangeType       string            `toml:"exchange_type"`
+	ExchangeDurability string            `toml:"exchange_durability"`
+	ExchangePassive    bool              `toml:"exchange_passive"`
+	ExchangeArguments  map[string]string `toml:"exchange_arguments"`
+
 	// AMQP Auth method
 	AuthMethod string
 	// Routing Key (static)
@@ -65,7 +70,11 @@ func (a *externalAuth) Response() string {
 }
 
 const (
-	DefaultAuthMethod      = "PLAIN"
+	DefaultAuthMethod = "PLAIN"
+
+	DefaultExchangeType       = "topic"
+	DefaultExchangeDurability = "durable"
+
 	DefaultRetentionPolicy = "default"
 	DefaultDatabase        = "telegraf"
 )
@@ -73,8 +82,23 @@ const (
 var sampleConfig = `
   ## AMQP url
   url = "amqp://localhost:5672/influxdb"
-  ## AMQP exchange
+
+  ## Exchange to declare and publish to.
   exchange = "telegraf"
+
+  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
+  # exchange_type = "topic"
+
+  ## If true, exchange will be passively declared.
+  # exchange_passive = false
+
+  ## Exchange durability can be either "transient" or "durable".
+  # exchange_durability = "durable"
+
+  ## Additional exchange arguments.
+  # exchange_args = { }
+  # exchange_args = {"hash_propery" = "timestamp"}
+
   ## Auth method. PLAIN and EXTERNAL are supported
   ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
   ## described here: https://www.rabbitmq.com/plugins.html
@@ -166,17 +190,28 @@ func (q *AMQP) Connect() error {
 		return fmt.Errorf("Failed to open a channel: %s", err)
 	}
 
-	err = channel.ExchangeDeclare(
-		q.Exchange, // name
-		"topic",    // type
-		true,       // durable
-		false,      // delete when unused
-		false,      // internal
-		false,      // no-wait
-		nil,        // arguments
-	)
+	var exchangeDurable = true
+	switch q.ExchangeDurability {
+	case "transient":
+		exchangeDurable = false
+	default:
+		exchangeDurable = true
+	}
+
+	exchangeArgs := make(amqp.Table, len(q.ExchangeArguments))
+	for k, v := range q.ExchangeArguments {
+		exchangeArgs[k] = v
+	}
+
+	err = declareExchange(
+		channel,
+		q.Exchange,
+		q.ExchangeType,
+		q.ExchangePassive,
+		exchangeDurable,
+		exchangeArgs)
 	if err != nil {
-		return fmt.Errorf("Failed to declare an exchange: %s", err)
+		return err
 	}
 
 	q.setClient(&client{
@@ -203,6 +238,42 @@ func (q *AMQP) Connect() error {
 	return nil
 }
 
+func declareExchange(
+	channel *amqp.Channel,
+	exchangeName string,
+	exchangeType string,
+	exchangePassive bool,
+	exchangeDurable bool,
+	exchangeArguments amqp.Table,
+) error {
+	var err error
+	if exchangePassive {
+		err = channel.ExchangeDeclarePassive(
+			exchangeName,
+			exchangeType,
+			exchangeDurable,
+			false, // delete when unused
+			false, // internal
+			false, // no-wait
+			exchangeArguments,
+		)
+	} else {
+		err = channel.ExchangeDeclare(
+			exchangeName,
+			exchangeType,
+			exchangeDurable,
+			false, // delete when unused
+			false, // internal
+			false, // no-wait
+			exchangeArguments,
+		)
+	}
+	if err != nil {
+		return fmt.Errorf("error declaring exchange: %v", err)
+	}
+	return nil
+}
+
 func (q *AMQP) Close() error {
 	c := q.getClient()
 	if c == nil {
@@ -292,10 +363,12 @@ func (q *AMQP) setClient(c *client) {
 func init() {
 	outputs.Add("amqp", func() telegraf.Output {
 		return &AMQP{
-			AuthMethod:      DefaultAuthMethod,
-			Database:        DefaultDatabase,
-			RetentionPolicy: DefaultRetentionPolicy,
-			Timeout:         internal.Duration{Duration: time.Second * 5},
+			AuthMethod:         DefaultAuthMethod,
+			ExchangeType:       DefaultExchangeType,
+			ExchangeDurability: DefaultExchangeDurability,
+			Database:           DefaultDatabase,
+			RetentionPolicy:    DefaultRetentionPolicy,
+			Timeout:            internal.Duration{Duration: time.Second * 5},
 		}
 	})
 }
-- 
GitLab