From 44320a542135a57a8a6ba18fc135dfac6b92af39 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel@wavesofdawn.com>
Date: Thu, 30 Nov 2017 18:40:12 -0800
Subject: [PATCH] Add option to amqp output to publish persistent messages
 (#3528)

---
 plugins/outputs/amqp/README.md |  3 +++
 plugins/outputs/amqp/amqp.go   | 28 ++++++++++++++++++++++++----
 2 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md
index d79af597..83407443 100644
--- a/plugins/outputs/amqp/README.md
+++ b/plugins/outputs/amqp/README.md
@@ -29,6 +29,9 @@ For an introduction to AMQP see:
   ## Telegraf tag to use as a routing key
   ##  ie, if this tag exists, its value will be used as the routing key
   routing_tag = "host"
+  ## Delivery Mode controls if a published message is persistent
+  ## Valid options are "transient" and "persistent". default: "transient"
+  # delivery_mode = "transient"
 
   ## InfluxDB retention policy
   # retention_policy = "default"
diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go
index 75fe4c71..fed1edfe 100644
--- a/plugins/outputs/amqp/amqp.go
+++ b/plugins/outputs/amqp/amqp.go
@@ -39,6 +39,9 @@ type AMQP struct {
 	Precision string
 	// Connection timeout
 	Timeout internal.Duration
+	// Delivery Mode controls if a published message is persistent
+	// Valid options are "transient" and "persistent". default: "transient"
+	DeliveryMode string
 
 	// Path to CA file
 	SSLCA string `toml:"ssl_ca"`
@@ -52,7 +55,8 @@ type AMQP struct {
 	sync.Mutex
 	c *client
 
-	serializer serializers.Serializer
+	deliveryMode uint8
+	serializer   serializers.Serializer
 }
 
 type externalAuth struct{}
@@ -82,6 +86,9 @@ var sampleConfig = `
   ## Telegraf tag to use as a routing key
   ##  ie, if this tag exists, its value will be used as the routing key
   routing_tag = "host"
+  ## Delivery Mode controls if a published message is persistent
+  ## Valid options are "transient" and "persistent". default: "transient"
+  delivery_mode = "transient"
 
   ## InfluxDB retention policy
   # retention_policy = "default"
@@ -111,6 +118,18 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
 }
 
 func (q *AMQP) Connect() error {
+	switch q.DeliveryMode {
+	case "transient":
+		q.deliveryMode = amqp.Transient
+		break
+	case "persistent":
+		q.deliveryMode = amqp.Persistent
+		break
+	default:
+		q.deliveryMode = amqp.Transient
+		break
+	}
+
 	headers := amqp.Table{
 		"database":         q.Database,
 		"retention_policy": q.RetentionPolicy,
@@ -245,9 +264,10 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
 			false,      // mandatory
 			false,      // immediate
 			amqp.Publishing{
-				Headers:     c.headers,
-				ContentType: "text/plain",
-				Body:        buf,
+				Headers:      c.headers,
+				ContentType:  "text/plain",
+				Body:         buf,
+				DeliveryMode: q.deliveryMode,
 			})
 		if err != nil {
 			return fmt.Errorf("Failed to send AMQP message: %s", err)
-- 
GitLab