From a093ec1eaae651d606f1c269583ad925897ed548 Mon Sep 17 00:00:00 2001 From: Cameron Sparr <cameronsparr@gmail.com> Date: Tue, 6 Dec 2016 15:38:59 +0000 Subject: [PATCH] Kafka output fixup --- plugins/outputs/kafka/kafka.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 1b350ac6..0bec9281 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -154,26 +154,23 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - values, err := k.serializer.Serialize(metric) + buf, err := k.serializer.Serialize(metric) if err != nil { return err } - var pubErr error - for _, value := range values { - m := &sarama.ProducerMessage{ - Topic: k.Topic, - Value: sarama.StringEncoder(value), - } - if h, ok := metric.Tags()[k.RoutingTag]; ok { - m.Key = sarama.StringEncoder(h) - } - - _, _, pubErr = k.producer.SendMessage(m) + m := &sarama.ProducerMessage{ + Topic: k.Topic, + Value: sarama.ByteEncoder(buf), } + if h, ok := metric.Tags()[k.RoutingTag]; ok { + m.Key = sarama.StringEncoder(h) + } + + _, _, err = k.producer.SendMessage(m) - if pubErr != nil { - return fmt.Errorf("FAILED to send kafka message: %s\n", pubErr) + if err != nil { + return fmt.Errorf("FAILED to send kafka message: %s\n", err) } } return nil -- GitLab