From 41534c73f0af3512465d85a8606566225bcd2b14 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Mon, 7 Mar 2016 13:56:10 +0100
Subject: [PATCH] mqtt_consumer: option to set persistent session and client ID

closes #797
---
 CHANGELOG.md                                  |  1 +
 plugins/inputs/mqtt_consumer/mqtt_consumer.go | 22 ++++++++-
 .../mqtt_consumer/mqtt_consumer_test.go       | 48 +++++++++++++++++++
 3 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fe87e41d..2640d3f2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@
 - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
 - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug!
 - [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener.
+- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.
 
 ### Bugfixes
 - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
index 42cadfd6..e3688970 100644
--- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go
+++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
@@ -26,6 +26,9 @@ type MQTTConsumer struct {
 	// Legacy metric buffer support
 	MetricBuffer int
 
+	PersistentSession bool
+	ClientID          string `toml:"client_id"`
+
 	// Path to CA file
 	SSLCA string `toml:"ssl_ca"`
 	// Path to host cert file
@@ -57,6 +60,13 @@ var sampleConfig = `
     "sensors/#",
   ]
 
+  # if true, messages that can't be delivered while the subscriber is offline
+  # will be delivered when it comes back (such as on service restart).
+  # NOTE: if true, client_id MUST be set
+  persistent_session = false
+  # If empty, a random client ID will be generated.
+  client_id = ""
+
   ## username and password to connect MQTT server.
   # username = "telegraf"
   # password = "metricsmetricsmetricsmetrics"
@@ -91,6 +101,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
 	m.Lock()
 	defer m.Unlock()
 
+	if m.PersistentSession && m.ClientID == "" {
+		return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" +
+			" = true, you MUST also set client_id")
+	}
+
 	m.acc = acc
 	if m.QoS > 2 || m.QoS < 0 {
 		return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
@@ -166,7 +181,11 @@ func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
 func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
 	opts := mqtt.NewClientOptions()
 
-	opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
+	if m.ClientID == "" {
+		opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
+	} else {
+		opts.SetClientID(m.ClientID)
+	}
 
 	tlsCfg, err := internal.GetTLSConfig(
 		m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
@@ -199,6 +218,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
 	}
 	opts.SetAutoReconnect(true)
 	opts.SetKeepAlive(time.Second * 60)
+	opts.SetCleanSession(!m.PersistentSession)
 	return opts, nil
 }
 
diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
index b1dd59bc..e926ebbb 100644
--- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
+++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
@@ -7,6 +7,8 @@ import (
 	"github.com/influxdata/telegraf/plugins/parsers"
 	"github.com/influxdata/telegraf/testutil"
 
+	"github.com/stretchr/testify/assert"
+
 	"git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
 )
 
@@ -28,6 +30,52 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) {
 	return n, in
 }
 
+// Test that default client has random ID
+func TestRandomClientID(t *testing.T) {
+	m1 := &MQTTConsumer{
+		Servers: []string{"localhost:1883"}}
+	opts, err := m1.createOpts()
+	assert.NoError(t, err)
+
+	m2 := &MQTTConsumer{
+		Servers: []string{"localhost:1883"}}
+	opts2, err2 := m2.createOpts()
+	assert.NoError(t, err2)
+
+	assert.NotEqual(t, opts.ClientID, opts2.ClientID)
+}
+
+// Test that default client has random ID
+func TestClientID(t *testing.T) {
+	m1 := &MQTTConsumer{
+		Servers:  []string{"localhost:1883"},
+		ClientID: "telegraf-test",
+	}
+	opts, err := m1.createOpts()
+	assert.NoError(t, err)
+
+	m2 := &MQTTConsumer{
+		Servers:  []string{"localhost:1883"},
+		ClientID: "telegraf-test",
+	}
+	opts2, err2 := m2.createOpts()
+	assert.NoError(t, err2)
+
+	assert.Equal(t, "telegraf-test", opts2.ClientID)
+	assert.Equal(t, "telegraf-test", opts.ClientID)
+}
+
+// Test that Start() fails if client ID is not set but persistent is
+func TestPersistentClientIDFail(t *testing.T) {
+	m1 := &MQTTConsumer{
+		Servers:           []string{"localhost:1883"},
+		PersistentSession: true,
+	}
+	acc := testutil.Accumulator{}
+	err := m1.Start(&acc)
+	assert.Error(t, err)
+}
+
 // Test that the parser parses NATS messages into metrics
 func TestRunParser(t *testing.T) {
 	n, in := newTestMQTTConsumer()
-- 
GitLab