Skip to content
Snippets Groups Projects
Commit 512d9822 authored by Dragostin Yanev (netixen)'s avatar Dragostin Yanev (netixen) Committed by Cameron Sparr
Browse files

Add NATS consumer input plugin.

parent d003ca46
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,7 @@ github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3
github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9
github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df
github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f
github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
......
......@@ -22,6 +22,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
......
# NATS Consumer
The [NATS](http://www.nats.io/about/) consumer plugin reads from
specified NATS subjects and adds messages to InfluxDB. The plugin expects messages
in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md).
A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/)
is used when subscribing to subjects so multiple instances of telegraf can read
from a NATS cluster in parallel.
## Configuration
```
# Read metrics from NATS subject(s)
[[inputs.nats_consumer]]
### urls of NATS servers
servers = ["nats://localhost:4222"]
### Use Transport Layer Security
secure = false
### subject(s) to consume
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
```
## Testing
To run tests:
```
go test
```
\ No newline at end of file
package natsconsumer
import (
"fmt"
"log"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nats-io/nats"
)
type natsError struct {
conn *nats.Conn
sub *nats.Subscription
err error
}
func (e natsError) Error() string {
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
}
type natsConsumer struct {
QueueGroup string
Subjects []string
Servers []string
Secure bool
PointBuffer int
parser parsers.Parser
sync.Mutex
Conn *nats.Conn
Subs []*nats.Subscription
// channel for all incoming NATS messages
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
// channel for all incoming parsed points
metricC chan telegraf.Metric
done chan struct{}
}
var sampleConfig = `
### urls of NATS servers
servers = ["nats://localhost:4222"]
### Use Transport Layer Security
secure = false
### subject(s) to consume
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func (n *natsConsumer) SampleConfig() string {
return sampleConfig
}
func (n *natsConsumer) Description() string {
return "Read metrics from NATS subject(s)"
}
func (n *natsConsumer) SetParser(parser parsers.Parser) {
n.parser = parser
}
func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
select {
case n.errs <- natsError{conn: c, sub: s, err: e}:
default:
return
}
}
// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start() error {
n.Lock()
defer n.Unlock()
var connectErr error
opts := nats.DefaultOptions
opts.Servers = n.Servers
opts.Secure = n.Secure
if n.Conn == nil || n.Conn.IsClosed() {
n.Conn, connectErr = opts.Connect()
if connectErr != nil {
return connectErr
}
// Setup message and error channels
n.errs = make(chan error)
n.Conn.SetErrorHandler(n.natsErrHandler)
n.in = make(chan *nats.Msg)
for _, subj := range n.Subjects {
sub, err := n.Conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in)
if err != nil {
return err
}
n.Subs = append(n.Subs, sub)
}
}
n.done = make(chan struct{})
if n.PointBuffer == 0 {
n.PointBuffer = 100000
}
n.metricC = make(chan telegraf.Metric, n.PointBuffer)
// Start the message reader
go n.receiver()
log.Printf("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
return nil
}
// receiver() reads all incoming messages from NATS, and parses them into
// influxdb metric points.
func (n *natsConsumer) receiver() {
defer n.clean()
for {
select {
case <-n.done:
return
case err := <-n.errs:
log.Printf("error reading from %s\n", err.Error())
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
log.Printf("subject: %s, error: %s", msg.Subject, err.Error())
}
for _, metric := range metrics {
select {
case n.metricC <- metric:
continue
default:
log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
}
}
}
}
}
func (n *natsConsumer) clean() {
n.Lock()
defer n.Unlock()
close(n.in)
close(n.metricC)
close(n.errs)
for _, sub := range n.Subs {
if err := sub.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing from subject %s in queue %s: %s\n",
sub.Subject, sub.Queue, err.Error())
}
}
if n.Conn != nil && !n.Conn.IsClosed() {
n.Conn.Close()
}
}
func (n *natsConsumer) Stop() {
n.Lock()
close(n.done)
n.Unlock()
}
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
npoints := len(n.metricC)
for i := 0; i < npoints; i++ {
point := <-n.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
}
return nil
}
func init() {
inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{}
})
}
package natsconsumer
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/nats-io/nats"
)
const (
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257"
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
pointBuffer = 5
)
func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
in := make(chan *nats.Msg, pointBuffer)
n := &natsConsumer{
QueueGroup: "test",
Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"},
Secure: false,
PointBuffer: pointBuffer,
in: in,
errs: make(chan error, pointBuffer),
done: make(chan struct{}),
metricC: make(chan telegraf.Metric, pointBuffer),
}
return n, in
}
// Test that the parser parses NATS messages into points
func TestRunParser(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- natsMsg(testMsg)
time.Sleep(time.Millisecond)
if a := len(n.metricC); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
}
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- natsMsg(invalidMsg)
time.Sleep(time.Millisecond)
if a := len(n.metricC); a != 0 {
t.Errorf("got %v, expected %v", a, 0)
}
}
// Test that points are dropped when we hit the buffer limit
func TestRunParserRespectsBuffer(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
for i := 0; i < pointBuffer+1; i++ {
in <- natsMsg(testMsg)
}
time.Sleep(time.Millisecond)
if a := len(n.metricC); a != pointBuffer {
t.Errorf("got %v, expected %v", a, pointBuffer)
}
}
// Test that the parser parses nats messages into points
func TestRunParserAndGather(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- natsMsg(testMsg)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses nats messages into points
func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go n.receiver()
in <- natsMsg(testMsgGraphite)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses nats messages into points
func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
go n.receiver()
in <- natsMsg(testMsgJSON)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "nats_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}
func natsMsg(val string) *nats.Msg {
return &nats.Msg{
Subject: "telegraf",
Data: []byte(val),
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment