Skip to content
Snippets Groups Projects
Commit bd9c5b69 authored by Cameron Sparr's avatar Cameron Sparr
Browse files

mqtt output: cleanup, implement TLS

Also normalize TLS config across all output plugins and normalize
comment strings as well.
parent b941d270
No related branches found
No related tags found
No related merge requests found
Showing
with 235 additions and 193 deletions
## v0.10.2 [unreleased]
## v0.10.3 [unreleased]
### Release Notes
### Features
### Bugfixes
## v0.10.2 [2016-02-04]
### Release Notes
- Statsd timing measurements are now aggregated into a single measurement with
fields.
- Graphite output now inserts tags into the bucket in alphabetical order.
- Normalized TLS/SSL support for output plugins: MQTT, AMQP, Kafka
- `verify_ssl` config option was removed from Kafka because it was actually
doing the opposite of what it claimed to do (yikes). It's been replaced by
`insecure_skip_verify`
### Features
- [#575](https://github.com/influxdata/telegraf/pull/575): Support for collecting Windows Performance Counters. Thanks @TheFlyingCorpse!
- [#564](https://github.com/influxdata/telegraf/issues/564): features for plugin writing simplification. Internal metric data type.
......
......@@ -2,14 +2,20 @@ package internal
import (
"bufio"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"time"
)
const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
// Duration just wraps time.Duration
type Duration struct {
Duration time.Duration
......@@ -105,6 +111,57 @@ func ReadLinesOffsetN(filename string, offset uint, n int) ([]string, error) {
return ret, nil
}
// RandomString returns a random string of alpha-numeric characters
func RandomString(n int) string {
var bytes = make([]byte, n)
rand.Read(bytes)
for i, b := range bytes {
bytes[i] = alphanum[b%byte(len(alphanum))]
}
return string(bytes)
}
// GetTLSConfig gets a tls.Config object from the given certs, key, and CA files.
// you must give the full path to the files.
// If all files are blank and InsecureSkipVerify=false, returns a nil pointer.
func GetTLSConfig(
SSLCert, SSLKey, SSLCA string,
InsecureSkipVerify bool,
) (*tls.Config, error) {
t := &tls.Config{}
if SSLCert != "" && SSLKey != "" && SSLCA != "" {
cert, err := tls.LoadX509KeyPair(SSLCert, SSLKey)
if err != nil {
return nil, errors.New(fmt.Sprintf(
"Could not load TLS client key/certificate: %s",
err))
}
caCert, err := ioutil.ReadFile(SSLCA)
if err != nil {
return nil, errors.New(fmt.Sprintf("Could not load TLS CA: %s",
err))
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: InsecureSkipVerify,
}
} else {
if InsecureSkipVerify {
t.InsecureSkipVerify = true
} else {
return nil, nil
}
}
// will be nil by default if nothing is provided
return t, nil
}
// Glob will test a string pattern, potentially containing globs, against a
// subject string. The result is a simple true/false, determining whether or
// not the glob pattern matched the subject text.
......
......@@ -22,13 +22,13 @@ type Amon struct {
}
var sampleConfig = `
# Amon Server Key
### Amon Server Key
server_key = "my-server-key" # required.
# Amon Instance URL
### Amon Instance URL
amon_instance = "https://youramoninstance" # required
# Connection timeout.
### Connection timeout.
# timeout = "5s"
`
......
......@@ -2,15 +2,13 @@ package amqp
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/streadway/amqp"
)
......@@ -20,12 +18,6 @@ type AMQP struct {
URL string
// AMQP exchange
Exchange string
// path to CA file
SslCa string
// path to host cert file
SslCert string
// path to cert key file
SslKey string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// InfluxDB database
......@@ -35,6 +27,15 @@ type AMQP struct {
// InfluxDB precision
Precision string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
channel *amqp.Channel
sync.Mutex
headers amqp.Table
......@@ -47,25 +48,27 @@ const (
)
var sampleConfig = `
# AMQP url
### AMQP url
url = "amqp://localhost:5672/influxdb"
# AMQP exchange
### AMQP exchange
exchange = "telegraf"
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
### Telegraf tag to use as a routing key
### ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
# Use ssl
#ssl_ca = "/etc/telegraf/ca.pem"
#ssl_cert = "/etc/telegraf/cert.pem"
#ssl_key = "/etc/telegraf/key.pem"
# InfluxDB retention policy
#retention_policy = "default"
# InfluxDB database
#database = "telegraf"
# InfluxDB precision
#precision = "s"
### InfluxDB retention policy
# retention_policy = "default"
### InfluxDB database
# database = "telegraf"
### InfluxDB precision
# precision = "s"
### Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
func (q *AMQP) Connect() error {
......@@ -79,28 +82,15 @@ func (q *AMQP) Connect() error {
}
var connection *amqp.Connection
var err error
if q.SslCert != "" && q.SslKey != "" {
// make new tls config
cfg := new(tls.Config)
if q.SslCa != "" {
// create ca pool
cfg.RootCAs = x509.NewCertPool()
// add self-signed cert
if ca, err := ioutil.ReadFile(q.SslCa); err == nil {
cfg.RootCAs.AppendCertsFromPEM(ca)
} else {
log.Println(err)
}
}
if cert, err := tls.LoadX509KeyPair(q.SslCert, q.SslKey); err == nil {
cfg.Certificates = append(cfg.Certificates, cert)
} else {
log.Println(err)
}
connection, err = amqp.DialTLS(q.URL, cfg)
// make new tls config
tls, err := internal.GetTLSConfig(
q.SSLCert, q.SSLKey, q.SSLCA, q.InsecureSkipVerify)
if err != nil {
return err
}
if tls != nil {
connection, err = amqp.DialTLS(q.URL, tls)
} else {
connection, err = amqp.Dial(q.URL)
}
......
......@@ -25,10 +25,10 @@ type CloudWatch struct {
}
var sampleConfig = `
# Amazon REGION
### Amazon REGION
region = 'us-east-1'
# Namespace for the CloudWatch MetricDatums
### Namespace for the CloudWatch MetricDatums
namespace = 'InfluxData/Telegraf'
`
......
......@@ -24,10 +24,10 @@ type Datadog struct {
}
var sampleConfig = `
# Datadog API key
### Datadog API key
apikey = "my-secret-key" # required.
# Connection timeout.
### Connection timeout.
# timeout = "5s"
`
......
......@@ -22,11 +22,11 @@ type Graphite struct {
}
var sampleConfig = `
# TCP endpoint for your graphite instance.
### TCP endpoint for your graphite instance.
servers = ["localhost:2003"]
# Prefix metrics name
### Prefix metrics name
prefix = ""
# timeout in seconds for the write connection to graphite
### timeout in seconds for the write connection to graphite
timeout = 2
`
......
......@@ -32,25 +32,25 @@ type InfluxDB struct {
}
var sampleConfig = `
# The full HTTP or UDP endpoint URL for your InfluxDB instance.
# Multiple urls can be specified but it is assumed that they are part of the same
# cluster, this means that only ONE of the urls will be written to each interval.
### The full HTTP or UDP endpoint URL for your InfluxDB instance.
### Multiple urls can be specified but it is assumed that they are part of the same
### cluster, this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
### The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required
# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
### Precision of writes, valid values are n, u, ms, s, m, and h
### note: using second precision greatly helps InfluxDB compression
precision = "s"
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# If not provided, will default to 0 (no timeout)
### Connection timeout (for the connection with InfluxDB), formatted as a string.
### If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for HTTP POSTs (can be useful for log differentiation)
### Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
### Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
`
......
......@@ -2,13 +2,14 @@ package kafka
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"io/ioutil"
"github.com/Shopify/sarama"
)
type Kafka struct {
......@@ -18,71 +19,62 @@ type Kafka struct {
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
// Verfiy SSL certificate chain
VerifySsl bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Skip SSL verification
InsecureSkipVerify bool
tlsConfig tls.Config
producer sarama.SyncProducer
}
var sampleConfig = `
# URLs of kafka brokers
### URLs of kafka brokers
brokers = ["localhost:9092"]
# Kafka topic for producer messages
### Kafka topic for producer messages
topic = "telegraf"
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
### Telegraf tag to use as a routing key
### ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
# Optional TLS configuration:
# Client certificate
certificate = ""
# Client key
key = ""
# Certificate authority file
ca = ""
# Verify SSL certificate chain
verify_ssl = false
### Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) {
if k.Certificate != "" && k.Key != "" && k.CA != "" {
cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
if err != nil {
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s",
err))
}
caCert, err := ioutil.ReadFile(k.CA)
if err != nil {
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s",
err))
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: k.VerifySsl,
}
}
// will be nil by default if nothing is provided
return t, nil
}
func (k *Kafka) Connect() error {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig, err := createTlsConfiguration(k)
// Wait for all in-sync replicas to ack the message
config.Producer.RequiredAcks = sarama.WaitForAll
// Retry up to 10 times to produce the message
config.Producer.Retry.Max = 10
// Legacy support ssl config
if k.Certificate != "" {
k.SSLCert = k.Certificate
k.SSLCA = k.CA
k.SSLKey = k.Key
}
tlsConfig, err := internal.GetTLSConfig(
k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
if err != nil {
return err
}
......
......@@ -28,16 +28,16 @@ type KinesisOutput struct {
}
var sampleConfig = `
# Amazon REGION of kinesis endpoint.
### Amazon REGION of kinesis endpoint.
region = "ap-southeast-2"
# Kinesis StreamName must exist prior to starting telegraf.
### Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
# PartitionKey as used for sharding data.
### PartitionKey as used for sharding data.
partitionkey = "PartitionKey"
# format of the Data payload in the kinesis PutRecord, supported
# String and Custom.
### format of the Data payload in the kinesis PutRecord, supported
### String and Custom.
format = "string"
# debug will show upstream aws messages.
### debug will show upstream aws messages.
debug = false
`
......
......@@ -23,20 +23,20 @@ type Librato struct {
}
var sampleConfig = `
# Librator API Docs
# http://dev.librato.com/v1/metrics-authentication
### Librator API Docs
### http://dev.librato.com/v1/metrics-authentication
# Librato API user
### Librato API user
api_user = "telegraf@influxdb.com" # required.
# Librato API token
### Librato API token
api_token = "my-secret-token" # required.
# Tag Field to populate source attribute (optional)
# This is typically the _hostname_ from which the metric was obtained.
### Tag Field to populate source attribute (optional)
### This is typically the _hostname_ from which the metric was obtained.
source_tag = "hostname"
# Connection timeout.
### Connection timeout.
# timeout = "5s"
`
......
package mqtt
import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"strings"
"sync"
......@@ -15,7 +11,6 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
)
const MaxClientIdLen = 8
const MaxRetryCount = 3
const ClientIdPrefix = "telegraf"
......@@ -27,22 +22,39 @@ type MQTT struct {
Timeout internal.Duration
TopicPrefix string
Client *paho.Client
Opts *paho.ClientOptions
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
client *paho.Client
opts *paho.ClientOptions
sync.Mutex
}
var sampleConfig = `
servers = ["localhost:1883"] # required.
# MQTT outputs send metrics to this topic format
# "<topic_prefix>/host/<hostname>/<pluginname>/"
# ex: prefix/host/web01.example.com/mem/available
# topic_prefix = "prefix"
### MQTT outputs send metrics to this topic format
### "<topic_prefix>/<hostname>/<pluginname>/"
### ex: prefix/host/web01.example.com/mem
topic_prefix = "telegraf"
# username and password to connect MQTT server.
### username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
### Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
func (m *MQTT) Connect() error {
......@@ -50,13 +62,13 @@ func (m *MQTT) Connect() error {
m.Lock()
defer m.Unlock()
m.Opts, err = m.CreateOpts()
m.opts, err = m.createOpts()
if err != nil {
return err
}
m.Client = paho.NewClient(m.Opts)
if token := m.Client.Connect(); token.Wait() && token.Error() != nil {
m.client = paho.NewClient(m.opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
......@@ -64,8 +76,8 @@ func (m *MQTT) Connect() error {
}
func (m *MQTT) Close() error {
if m.Client.IsConnected() {
m.Client.Disconnect(20)
if m.client.IsConnected() {
m.client.Disconnect(20)
}
return nil
}
......@@ -94,12 +106,11 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
if m.TopicPrefix != "" {
t = append(t, m.TopicPrefix)
}
tm := strings.Split(p.Name(), "_")
if len(tm) < 2 {
tm = []string{p.Name(), "stat"}
if hostname != "" {
t = append(t, hostname)
}
t = append(t, "host", hostname, tm[0], tm[1])
t = append(t, p.Name())
topic := strings.Join(t, "/")
value := p.String()
......@@ -113,7 +124,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
}
func (m *MQTT) publish(topic, body string) error {
token := m.Client.Publish(topic, 0, false, body)
token := m.client.Publish(topic, 0, false, body)
token.Wait()
if token.Error() != nil {
return token.Error()
......@@ -121,25 +132,22 @@ func (m *MQTT) publish(topic, body string) error {
return nil
}
func (m *MQTT) CreateOpts() (*paho.ClientOptions, error) {
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions()
clientId := getRandomClientId()
opts.SetClientID(clientId)
opts.SetClientID("Telegraf-Output-" + internal.RandomString(5))
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
if err != nil {
return nil, err
}
TLSConfig := &tls.Config{InsecureSkipVerify: false}
ca := "" // TODO
scheme := "tcp"
if ca != "" {
if tlsCfg != nil {
scheme = "ssl"
certPool, err := getCertPool(ca)
if err != nil {
return nil, err
}
TLSConfig.RootCAs = certPool
opts.SetTLSConfig(tlsCfg)
}
TLSConfig.InsecureSkipVerify = true // TODO
opts.SetTLSConfig(TLSConfig)
user := m.Username
if user == "" {
......@@ -162,27 +170,6 @@ func (m *MQTT) CreateOpts() (*paho.ClientOptions, error) {
return opts, nil
}
func getRandomClientId() string {
const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
var bytes = make([]byte, MaxClientIdLen)
rand.Read(bytes)
for i, b := range bytes {
bytes[i] = alphanum[b%byte(len(alphanum))]
}
return ClientIdPrefix + "-" + string(bytes)
}
func getCertPool(pemPath string) (*x509.CertPool, error) {
certs := x509.NewCertPool()
pemData, err := ioutil.ReadFile(pemPath)
if err != nil {
return nil, err
}
certs.AppendCertsFromPEM(pemData)
return certs, nil
}
func init() {
outputs.Add("mqtt", func() telegraf.Output {
return &MQTT{}
......
......@@ -14,9 +14,9 @@ type NSQ struct {
}
var sampleConfig = `
# Location of nsqd instance listening on TCP
### Location of nsqd instance listening on TCP
server = "localhost:4150"
# NSQ topic for producer messages
### NSQ topic for producer messages
topic = "telegraf"
`
......
......@@ -22,17 +22,17 @@ type OpenTSDB struct {
}
var sampleConfig = `
# prefix for metrics keys
### prefix for metrics keys
prefix = "my.specific.prefix."
## Telnet Mode ##
# DNS name of the OpenTSDB server in telnet mode
### DNS name of the OpenTSDB server in telnet mode
host = "opentsdb.example.com"
# Port of the OpenTSDB server in telnet mode
### Port of the OpenTSDB server in telnet mode
port = 4242
# Debug true - Prints OpenTSDB communication
### Debug true - Prints OpenTSDB communication
debug = false
`
......
......@@ -16,7 +16,7 @@ type PrometheusClient struct {
}
var sampleConfig = `
# Address to listen on
### Address to listen on
# listen = ":9126"
`
......
......@@ -17,7 +17,8 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
pTesting = &PrometheusClient{Listen: "localhost:9127"}
pTesting.Start()
err := pTesting.Start()
require.NoError(t, err)
defer pTesting.Stop()
p := &prometheus.Prometheus{
......
......@@ -18,9 +18,9 @@ type Riemann struct {
}
var sampleConfig = `
# URL of server
### URL of server
url = "localhost:5555"
# transport protocol to use either tcp or udp
### transport protocol to use either tcp or udp
transport = "tcp"
`
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment