From c313af1b24dd9e32172fe6a100af75bbfe04792b Mon Sep 17 00:00:00 2001
From: Hannu Valtonen <hannu.valtonen@ohmu.fi>
Date: Mon, 11 Jan 2016 14:20:51 +0200
Subject: [PATCH] kafka: Add support for using TLS authentication for the kafka
 output

With the advent of Kafka 0.9.0+ it is possible to set up TLS client
certificate based authentication to limit access to Kafka.

Four new configuration variables are specified for setting up the
authentication. If they're not set the behavior stays the same as
before the change.

closes #541
---
 CHANGELOG.md                   |  1 +
 plugins/outputs/kafka/kafka.go | 67 ++++++++++++++++++++++++++++++++--
 2 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 82a447e0..080ce587 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@
 - AMQP SSL support. Thanks @ekini!
 - [#539](https://github.com/influxdata/telegraf/pull/539): Reload config on SIGHUP. Thanks @titilambert!
 - [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain!
+- [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod!
 
 ### Bugfixes
 - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!
diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go
index 8e53cc51..55ef35fb 100644
--- a/plugins/outputs/kafka/kafka.go
+++ b/plugins/outputs/kafka/kafka.go
@@ -1,12 +1,14 @@
 package kafka
 
 import (
+	"crypto/tls"
+	"crypto/x509"
 	"errors"
 	"fmt"
-
 	"github.com/Shopify/sarama"
 	"github.com/influxdb/influxdb/client/v2"
 	"github.com/influxdb/telegraf/plugins/outputs"
+	"io/ioutil"
 )
 
 type Kafka struct {
@@ -16,8 +18,17 @@ type Kafka struct {
 	Topic string
 	// Routing Key Tag
 	RoutingTag string `toml:"routing_tag"`
+	// TLS client certificate
+	Certificate string
+	// TLS client key
+	Key string
+	// TLS certificate authority
+	CA string
+	// Verfiy SSL certificate chain
+	VerifySsl bool
 
-	producer sarama.SyncProducer
+	tlsConfig tls.Config
+	producer  sarama.SyncProducer
 }
 
 var sampleConfig = `
@@ -28,10 +39,60 @@ var sampleConfig = `
   # Telegraf tag to use as a routing key
   #  ie, if this tag exists, it's value will be used as the routing key
   routing_tag = "host"
+
+  # Optional TLS configuration:
+  # Client certificate
+  certificate = ""
+  # Client key
+  key = ""
+  # Certificate authority file
+  ca = ""
+  # Verify SSL certificate chain
+  verify_ssl = false
 `
 
+func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) {
+	if k.Certificate != "" && k.Key != "" && k.CA != "" {
+		cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
+		if err != nil {
+			return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s",
+				err))
+		}
+
+		caCert, err := ioutil.ReadFile(k.CA)
+		if err != nil {
+			return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s",
+				err))
+		}
+
+		caCertPool := x509.NewCertPool()
+		caCertPool.AppendCertsFromPEM(caCert)
+
+		t = &tls.Config{
+			Certificates:       []tls.Certificate{cert},
+			RootCAs:            caCertPool,
+			InsecureSkipVerify: k.VerifySsl,
+		}
+	}
+	// will be nil by default if nothing is provided
+	return t, nil
+}
+
 func (k *Kafka) Connect() error {
-	producer, err := sarama.NewSyncProducer(k.Brokers, nil)
+	config := sarama.NewConfig()
+	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
+	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
+	tlsConfig, err := createTlsConfiguration(k)
+	if err != nil {
+		return err
+	}
+
+	if tlsConfig != nil {
+		config.Net.TLS.Config = tlsConfig
+		config.Net.TLS.Enable = true
+	}
+
+	producer, err := sarama.NewSyncProducer(k.Brokers, config)
 	if err != nil {
 		return err
 	}
-- 
GitLab