From cd103c85dbbd80917040f29ff6e28cb1be6d6aeb Mon Sep 17 00:00:00 2001
From: Seuf <seuf76@gmail.com>
Date: Thu, 27 Apr 2017 20:50:25 +0200
Subject: [PATCH] Added SASL options for ouput kafka plugin (#2721)

---
 CHANGELOG.md                    |  1 +
 plugins/outputs/kafka/README.md |  4 ++++
 plugins/outputs/kafka/kafka.go  | 15 +++++++++++++++
 3 files changed, 20 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1b2651e3..71764252 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -41,6 +41,7 @@ be deprecated eventually.
 
 ### Features
 
+- [#2721](https://github.com/influxdata/telegraf/pull/2721): Added SASL options for kafka output plugin.
 - [#2723](https://github.com/influxdata/telegraf/pull/2723): Added SSL configuration for input haproxy.
 - [#2494](https://github.com/influxdata/telegraf/pull/2494): Add interrupts input plugin.
 - [#2094](https://github.com/influxdata/telegraf/pull/2094): Add generic socket listener & writer.
diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md
index 8f50f181..abd9c492 100644
--- a/plugins/outputs/kafka/README.md
+++ b/plugins/outputs/kafka/README.md
@@ -45,6 +45,10 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
   ## Use SSL but skip chain & host verification
   # insecure_skip_verify = false
 
+  ## Optional SASL Config
+  # sasl_username = "kafka"
+  # sasl_password = "secret"
+
   data_format = "influx"
 ```
 
diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go
index bdc93eae..421b5c2a 100644
--- a/plugins/outputs/kafka/kafka.go
+++ b/plugins/outputs/kafka/kafka.go
@@ -44,6 +44,11 @@ type Kafka struct {
 	// Skip SSL verification
 	InsecureSkipVerify bool
 
+	// SASL Username
+	SASLUsername string `toml:"sasl_username"`
+	// SASL Password
+	SASLPassword string `toml:"sasl_password"`
+
 	tlsConfig tls.Config
 	producer  sarama.SyncProducer
 
@@ -92,6 +97,10 @@ var sampleConfig = `
   ## Use SSL but skip chain & host verification
   # insecure_skip_verify = false
 
+  ## Optional SASL Config
+  # sasl_username = "kafka"
+  # sasl_password = "secret"
+
   ## Data format to output.
   ## Each data format has its own unique set of configuration options, read
   ## more about them here:
@@ -129,6 +138,12 @@ func (k *Kafka) Connect() error {
 		config.Net.TLS.Enable = true
 	}
 
+	if k.SASLUsername != "" && k.SASLPassword != "" {
+		config.Net.SASL.User = k.SASLUsername
+		config.Net.SASL.Password = k.SASLPassword
+		config.Net.SASL.Enable = true
+	}
+
 	producer, err := sarama.NewSyncProducer(k.Brokers, config)
 	if err != nil {
 		return err
-- 
GitLab