From 179bcfdcbb496b2ebdccc8ad954608427c69a7dd Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel.nelson@influxdb.com>
Date: Sun, 3 Jun 2018 17:05:33 -0700
Subject: [PATCH] Use list of brokers in amqp output and amqp_consumer

---
 plugins/inputs/amqp_consumer/README.md        | 14 +++-
 plugins/inputs/amqp_consumer/amqp_consumer.go | 66 +++++++++++++++----
 plugins/outputs/amqp/README.md                | 14 +++-
 plugins/outputs/amqp/amqp.go                  | 66 +++++++++++++++----
 4 files changed, 128 insertions(+), 32 deletions(-)

diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md
index 53d97d08..84661aec 100644
--- a/plugins/inputs/amqp_consumer/README.md
+++ b/plugins/inputs/amqp_consumer/README.md
@@ -15,8 +15,18 @@ The following defaults are known to work with RabbitMQ:
 ```toml
 # AMQP consumer plugin
 [[inputs.amqp_consumer]]
-  ## AMQP url
-  url = "amqp://localhost:5672/influxdb"
+  ## Broker to consume from.
+  ##   deprecated in 1.7; use the brokers option
+  # url = "amqp://localhost:5672/influxdb"
+
+  ## Brokers to consume from.  If multiple brokers are specified a random broker
+  ## will be selected anytime a connection is established.  This can be
+  ## helpful for load balancing when not using a dedicated load balancer.
+  brokers = ["amqp://localhost:5672/influxdb"]
+
+  ## Authentication credentials for the PLAIN auth_method.
+  # username = ""
+  # password = ""
 
   ## Exchange to declare and consume from.
   exchange = "telegraf"
diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go
index ad0e3ce7..d39c995c 100644
--- a/plugins/inputs/amqp_consumer/amqp_consumer.go
+++ b/plugins/inputs/amqp_consumer/amqp_consumer.go
@@ -1,8 +1,10 @@
 package amqp_consumer
 
 import (
+	"errors"
 	"fmt"
 	"log"
+	"math/rand"
 	"strings"
 	"sync"
 	"time"
@@ -17,8 +19,10 @@ import (
 
 // AMQPConsumer is the top level struct for this plugin
 type AMQPConsumer struct {
-	URL string
-
+	URL                string            `toml:"url"` // deprecated in 1.7; use brokers
+	Brokers            []string          `toml:"brokers"`
+	Username           string            `toml:"username"`
+	Password           string            `toml:"password"`
 	Exchange           string            `toml:"exchange"`
 	ExchangeType       string            `toml:"exchange_type"`
 	ExchangeDurability string            `toml:"exchange_durability"`
@@ -55,6 +59,8 @@ func (a *externalAuth) Response() string {
 const (
 	DefaultAuthMethod = "PLAIN"
 
+	DefaultBroker = "amqp://localhost:5672/influxdb"
+
 	DefaultExchangeType       = "topic"
 	DefaultExchangeDurability = "durable"
 
@@ -63,8 +69,18 @@ const (
 
 func (a *AMQPConsumer) SampleConfig() string {
 	return `
-  ## AMQP url
-  url = "amqp://localhost:5672/influxdb"
+  ## Broker to consume from.
+  ##   deprecated in 1.7; use the brokers option
+  # url = "amqp://localhost:5672/influxdb"
+
+  ## Brokers to consume from.  If multiple brokers are specified a random broker
+  ## will be selected anytime a connection is established.  This can be
+  ## helpful for load balancing when not using a dedicated load balancer.
+  brokers = ["amqp://localhost:5672/influxdb"]
+
+  ## Authentication credentials for the PLAIN auth_method.
+  # username = ""
+  # password = ""
 
   ## Exchange to declare and consume from.
   exchange = "telegraf"
@@ -130,16 +146,21 @@ func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
 		return nil, err
 	}
 
-	// parse auth method
-	var sasl []amqp.Authentication // nil by default
-
+	var auth []amqp.Authentication
 	if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
-		sasl = []amqp.Authentication{&externalAuth{}}
+		auth = []amqp.Authentication{&externalAuth{}}
+	} else if a.Username != "" || a.Password != "" {
+		auth = []amqp.Authentication{
+			&amqp.PlainAuth{
+				Username: a.Username,
+				Password: a.Password,
+			},
+		}
 	}
 
 	config := amqp.Config{
 		TLSClientConfig: tls,
-		SASL:            sasl, // if nil, it will be PLAIN
+		SASL:            auth, // if nil, it will be PLAIN
 	}
 	return &config, nil
 }
@@ -187,13 +208,29 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
 }
 
 func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
-	conn, err := amqp.DialConfig(a.URL, *amqpConf)
-	if err != nil {
-		return nil, err
+	brokers := a.Brokers
+	if len(brokers) == 0 {
+		brokers = []string{a.URL}
+	}
+
+	p := rand.Perm(len(brokers))
+	for _, n := range p {
+		broker := brokers[n]
+		log.Printf("D! [amqp_consumer] connecting to %q", broker)
+		conn, err := amqp.DialConfig(broker, *amqpConf)
+		if err == nil {
+			a.conn = conn
+			log.Printf("D! [amqp_consumer] connected to %q", broker)
+			break
+		}
+		log.Printf("D! [amqp_consumer] error connecting to %q", broker)
+	}
+
+	if a.conn == nil {
+		return nil, errors.New("could not connect to any broker")
 	}
-	a.conn = conn
 
-	ch, err := conn.Channel()
+	ch, err := a.conn.Channel()
 	if err != nil {
 		return nil, fmt.Errorf("Failed to open a channel: %s", err)
 	}
@@ -338,6 +375,7 @@ func (a *AMQPConsumer) Stop() {
 func init() {
 	inputs.Add("amqp_consumer", func() telegraf.Input {
 		return &AMQPConsumer{
+			URL:                DefaultBroker,
 			AuthMethod:         DefaultAuthMethod,
 			ExchangeType:       DefaultExchangeType,
 			ExchangeDurability: DefaultExchangeDurability,
diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md
index bb66a030..ebc55514 100644
--- a/plugins/outputs/amqp/README.md
+++ b/plugins/outputs/amqp/README.md
@@ -24,8 +24,18 @@ For an introduction to AMQP see:
 ```toml
 # Configuration for the AMQP server to send metrics to
 [[outputs.amqp]]
-  ## AMQP url
-  url = "amqp://localhost:5672/influxdb"
+  ## Broker to publish to.
+  ##   deprecated in 1.7; use the brokers option
+  # url = "amqp://localhost:5672/influxdb"
+
+  ## Brokers to publish to.  If multiple brokers are specified a random broker
+  ## will be selected anytime a connection is established.  This can be
+  ## helpful for load balancing when not using a dedicated load balancer.
+  brokers = ["amqp://localhost:5672/influxdb"]
+
+  ## Authentication credentials for the PLAIN auth_method.
+  # username = ""
+  # password = ""
 
   ## Exchange to declare and publish to.
   exchange = "telegraf"
diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go
index 8a3bcbd6..dfed5713 100644
--- a/plugins/outputs/amqp/amqp.go
+++ b/plugins/outputs/amqp/amqp.go
@@ -1,8 +1,10 @@
 package amqp
 
 import (
+	"errors"
 	"fmt"
 	"log"
+	"math/rand"
 	"net"
 	"strings"
 	"sync"
@@ -24,9 +26,10 @@ type client struct {
 }
 
 type AMQP struct {
-	// AMQP brokers to send metrics to
-	URL string
-
+	URL                string            `toml:"url"` // deprecated in 1.7; use brokers
+	Brokers            []string          `toml:"brokers"`
+	Username           string            `toml:"username"`
+	Password           string            `toml:"password"`
 	Exchange           string            `toml:"exchange"`
 	ExchangeType       string            `toml:"exchange_type"`
 	ExchangeDurability string            `toml:"exchange_durability"`
@@ -72,6 +75,8 @@ func (a *externalAuth) Response() string {
 const (
 	DefaultAuthMethod = "PLAIN"
 
+	DefaultBroker = "amqp://localhost:5672/influxdb"
+
 	DefaultExchangeType       = "topic"
 	DefaultExchangeDurability = "durable"
 
@@ -80,8 +85,18 @@ const (
 )
 
 var sampleConfig = `
-  ## AMQP url
-  url = "amqp://localhost:5672/influxdb"
+  ## Broker to publish to.
+  ##   deprecated in 1.7; use the brokers option
+  # url = "amqp://localhost:5672/influxdb"
+
+  ## Brokers to publish to.  If multiple brokers are specified a random broker
+  ## will be selected anytime a connection is established.  This can be
+  ## helpful for load balancing when not using a dedicated load balancer.
+  brokers = ["amqp://localhost:5672/influxdb"]
+
+  ## Authentication credentials for the PLAIN auth_method.
+  # username = ""
+  # password = ""
 
   ## Exchange to declare and publish to.
   exchange = "telegraf"
@@ -158,31 +173,53 @@ func (q *AMQP) Connect() error {
 		"retention_policy": q.RetentionPolicy,
 	}
 
-	var connection *amqp.Connection
 	// make new tls config
 	tls, err := q.ClientConfig.TLSConfig()
 	if err != nil {
 		return err
 	}
 
-	// parse auth method
-	var sasl []amqp.Authentication // nil by default
-
+	var auth []amqp.Authentication
 	if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
-		sasl = []amqp.Authentication{&externalAuth{}}
+		auth = []amqp.Authentication{&externalAuth{}}
+	} else if q.Username != "" || q.Password != "" {
+		auth = []amqp.Authentication{
+			&amqp.PlainAuth{
+				Username: q.Username,
+				Password: q.Password,
+			},
+		}
+	}
+
+	brokers := q.Brokers
+	if len(brokers) == 0 {
+		brokers = []string{q.URL}
 	}
 
 	amqpConf := amqp.Config{
 		TLSClientConfig: tls,
-		SASL:            sasl, // if nil, it will be PLAIN
+		SASL:            auth, // if nil, it will be PLAIN
 		Dial: func(network, addr string) (net.Conn, error) {
 			return net.DialTimeout(network, addr, q.Timeout.Duration)
 		},
 	}
 
-	connection, err = amqp.DialConfig(q.URL, amqpConf)
-	if err != nil {
-		return err
+	var connection *amqp.Connection
+	p := rand.Perm(len(brokers))
+	for _, n := range p {
+		broker := brokers[n]
+		log.Printf("D! Output [amqp] connecting to %q", broker)
+		conn, err := amqp.DialConfig(broker, amqpConf)
+		if err == nil {
+			connection = conn
+			log.Printf("D! Output [amqp] connected to %q", broker)
+			break
+		}
+		log.Printf("D! Output [amqp] error connecting to %q", broker)
+	}
+
+	if connection == nil {
+		return errors.New("could not connect to any broker")
 	}
 
 	channel, err := connection.Channel()
@@ -363,6 +400,7 @@ func (q *AMQP) setClient(c *client) {
 func init() {
 	outputs.Add("amqp", func() telegraf.Output {
 		return &AMQP{
+			URL:                DefaultBroker,
 			AuthMethod:         DefaultAuthMethod,
 			ExchangeType:       DefaultExchangeType,
 			ExchangeDurability: DefaultExchangeDurability,
-- 
GitLab