diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 1b350ac6c75a46e8e37783a8aef6bfd97166856a..0bec928127f22de05a2226fb5276390af89883e2 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -154,26 +154,23 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - values, err := k.serializer.Serialize(metric) + buf, err := k.serializer.Serialize(metric) if err != nil { return err } - var pubErr error - for _, value := range values { - m := &sarama.ProducerMessage{ - Topic: k.Topic, - Value: sarama.StringEncoder(value), - } - if h, ok := metric.Tags()[k.RoutingTag]; ok { - m.Key = sarama.StringEncoder(h) - } - - _, _, pubErr = k.producer.SendMessage(m) + m := &sarama.ProducerMessage{ + Topic: k.Topic, + Value: sarama.ByteEncoder(buf), } + if h, ok := metric.Tags()[k.RoutingTag]; ok { + m.Key = sarama.StringEncoder(h) + } + + _, _, err = k.producer.SendMessage(m) - if pubErr != nil { - return fmt.Errorf("FAILED to send kafka message: %s\n", pubErr) + if err != nil { + return fmt.Errorf("FAILED to send kafka message: %s\n", err) } } return nil