Skip to content
Snippets Groups Projects
Commit 37b96c19 authored by Eugene Dementiev's avatar Eugene Dementiev Committed by Cameron Sparr
Browse files

output amqp: Add ssl support

closes #536
parent 8cbdf0f9
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
- [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion. - [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion.
- [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek! - [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek!
- [#494](https://github.com/influxdata/telegraf/pull/494): Graphite output plugin. Thanks @titilambert! - [#494](https://github.com/influxdata/telegraf/pull/494): Graphite output plugin. Thanks @titilambert!
- AMQP SSL support. Thanks @ekini!
### Bugfixes ### Bugfixes
- [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!
......
...@@ -2,7 +2,10 @@ package amqp ...@@ -2,7 +2,10 @@ package amqp
import ( import (
"bytes" "bytes"
"crypto/tls"
"crypto/x509"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"sync" "sync"
"time" "time"
...@@ -17,6 +20,12 @@ type AMQP struct { ...@@ -17,6 +20,12 @@ type AMQP struct {
URL string URL string
// AMQP exchange // AMQP exchange
Exchange string Exchange string
// path to CA file
SslCa string
// path to host cert file
SslCert string
// path to cert key file
SslKey string
// Routing Key Tag // Routing Key Tag
RoutingTag string `toml:"routing_tag"` RoutingTag string `toml:"routing_tag"`
// InfluxDB database // InfluxDB database
...@@ -46,6 +55,11 @@ var sampleConfig = ` ...@@ -46,6 +55,11 @@ var sampleConfig = `
# ie, if this tag exists, it's value will be used as the routing key # ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host" routing_tag = "host"
# Use ssl
#ssl_ca = "/etc/telegraf/ca.pem"
#ssl_cert = "/etc/telegraf/cert.pem"
#ssl_key = "/etc/telegraf/key.pem"
# InfluxDB retention policy # InfluxDB retention policy
#retention_policy = "default" #retention_policy = "default"
# InfluxDB database # InfluxDB database
...@@ -64,7 +78,32 @@ func (q *AMQP) Connect() error { ...@@ -64,7 +78,32 @@ func (q *AMQP) Connect() error {
"retention_policy": q.RetentionPolicy, "retention_policy": q.RetentionPolicy,
} }
connection, err := amqp.Dial(q.URL) var connection *amqp.Connection
var err error
if q.SslCert != "" && q.SslKey != "" {
// make new tls config
cfg := new(tls.Config)
if q.SslCa != "" {
// create ca pool
cfg.RootCAs = x509.NewCertPool()
// add self-signed cert
if ca, err := ioutil.ReadFile(q.SslCa); err == nil {
cfg.RootCAs.AppendCertsFromPEM(ca)
} else {
log.Println(err)
}
}
if cert, err := tls.LoadX509KeyPair(q.SslCert, q.SslKey); err == nil {
cfg.Certificates = append(cfg.Certificates, cert)
} else {
log.Println(err)
}
connection, err = amqp.DialTLS(q.URL, cfg)
} else {
connection, err = amqp.Dial(q.URL)
}
if err != nil { if err != nil {
return err return err
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment