From f32916a5bdbb3d0babb5edc411514012e96a9d0a Mon Sep 17 00:00:00 2001
From: Jason Roelofs <jasongroelofs@gmail.com>
Date: Mon, 2 May 2016 10:58:29 -0400
Subject: [PATCH] Output stats to the Instrumental TCP Collector

closes #1139
---
 CHANGELOG.md                                  |   1 +
 README.md                                     |   1 +
 plugins/outputs/all/all.go                    |   1 +
 plugins/outputs/instrumental/README.md        |  25 +++
 plugins/outputs/instrumental/instrumental.go  | 192 ++++++++++++++++++
 .../outputs/instrumental/instrumental_test.go | 114 +++++++++++
 6 files changed, 334 insertions(+)
 create mode 100644 plugins/outputs/instrumental/README.md
 create mode 100644 plugins/outputs/instrumental/instrumental.go
 create mode 100644 plugins/outputs/instrumental/instrumental_test.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index a8a75617..63c4403c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,7 @@
 ### Features
 
 - [#1138](https://github.com/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek!
+- [#1139](https://github.com/influxdata/telegraf/pull/1139): instrumental output plugin. Thanks @jasonroelofs!
 
 ### Bugfixes
 
diff --git a/README.md b/README.md
index 8d325f6d..b6cc9153 100644
--- a/README.md
+++ b/README.md
@@ -245,6 +245,7 @@ want to add support for another service or third-party API.
 * [datadog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/datadog)
 * [file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file)
 * [graphite](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graphite)
+* [instrumental](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/instrumental)
 * [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka)
 * [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato)
 * [mqtt](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/mqtt)
diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go
index 18fb1c92..5b223529 100644
--- a/plugins/outputs/all/all.go
+++ b/plugins/outputs/all/all.go
@@ -8,6 +8,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/outputs/file"
 	_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
 	_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
+	_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
 	_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
 	_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
 	_ "github.com/influxdata/telegraf/plugins/outputs/librato"
diff --git a/plugins/outputs/instrumental/README.md b/plugins/outputs/instrumental/README.md
new file mode 100644
index 00000000..128599ee
--- /dev/null
+++ b/plugins/outputs/instrumental/README.md
@@ -0,0 +1,25 @@
+# Instrumental Output Plugin
+
+This plugin writes to the [Instrumental Collector API](https://instrumentalapp.com/docs/tcp-collector)
+and requires a Project-specific API token.
+
+Instrumental accepts stats in a format very close to Graphite, with the only difference being that
+the type of stat (gauge, increment) is the first token, separated from the metric itself
+by whitespace. The `increment` type is only used if the metric comes in as a counter through `[[input.statsd]]`.
+
+## Configuration:
+
+```toml
+[[outputs.instrumental]]
+  ## Project API Token (required)
+  api_token = "API Token"  # required
+  ## Prefix the metrics with a given name
+  prefix = ""
+  ## Stats output template (Graphite formatting)
+  ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
+  template = "host.tags.measurement.field"
+  ## Timeout in seconds to connect
+  timeout = "2s"
+  ## Debug true - Print communcation to Instrumental
+  debug = false
+```
diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go
new file mode 100644
index 00000000..461ba9d9
--- /dev/null
+++ b/plugins/outputs/instrumental/instrumental.go
@@ -0,0 +1,192 @@
+package instrumental
+
+import (
+	"fmt"
+	"io"
+	"log"
+	"net"
+	"regexp"
+	"strings"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/plugins/outputs"
+	"github.com/influxdata/telegraf/plugins/serializers"
+	"github.com/influxdata/telegraf/plugins/serializers/graphite"
+)
+
+type Instrumental struct {
+	Host       string
+	ApiToken   string
+	Prefix     string
+	DataFormat string
+	Template   string
+	Timeout    internal.Duration
+	Debug      bool
+
+	conn net.Conn
+}
+
+const (
+	DefaultHost = "collector.instrumentalapp.com"
+	AuthFormat  = "hello version go/telegraf/1.0\nauthenticate %s\n"
+)
+
+var (
+	StatIncludesBadChar = regexp.MustCompile("[^[:alnum:][:blank:]-_.]")
+)
+
+var sampleConfig = `
+  ## Project API Token (required)
+  api_token = "API Token" # required
+  ## Prefix the metrics with a given name
+  prefix = ""
+  ## Stats output template (Graphite formatting)
+  ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
+  template = "host.tags.measurement.field"
+  ## Timeout in seconds to connect
+  timeout = "2s"
+  ## Display Communcation to Instrumental
+  debug = false
+`
+
+func (i *Instrumental) Connect() error {
+	connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
+	if err != nil {
+		i.conn = nil
+		return err
+	}
+
+	err = i.authenticate(connection)
+	if err != nil {
+		i.conn = nil
+		return err
+	}
+
+	return nil
+}
+
+func (i *Instrumental) Close() error {
+	i.conn.Close()
+	i.conn = nil
+	return nil
+}
+
+func (i *Instrumental) Write(metrics []telegraf.Metric) error {
+	if i.conn == nil {
+		err := i.Connect()
+		if err != nil {
+			return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err)
+		}
+	}
+
+	s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
+	if err != nil {
+		return err
+	}
+
+	var points []string
+	var metricType string
+	var toSerialize telegraf.Metric
+	var newTags map[string]string
+
+	for _, metric := range metrics {
+		// Pull the metric_type out of the metric's tags. We don't want the type
+		// to show up with the other tags pulled from the system, as they go in the
+		// beginning of the line instead.
+		// e.g we want:
+		//
+		//  increment some_prefix.host.tag1.tag2.tag3.field value timestamp
+		//
+		// vs
+		//
+		//  increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp
+		//
+		newTags = metric.Tags()
+		metricType = newTags["metric_type"]
+		delete(newTags, "metric_type")
+
+		toSerialize, _ = telegraf.NewMetric(
+			metric.Name(),
+			newTags,
+			metric.Fields(),
+			metric.Time(),
+		)
+
+		stats, err := s.Serialize(toSerialize)
+		if err != nil {
+			log.Printf("Error serializing a metric to Instrumental: %s", err)
+		}
+
+		switch metricType {
+		case "counter":
+			fallthrough
+		case "histogram":
+			metricType = "increment"
+		default:
+			metricType = "gauge"
+		}
+
+		for _, stat := range stats {
+			if !StatIncludesBadChar.MatchString(stat) {
+				points = append(points, fmt.Sprintf("%s %s", metricType, stat))
+			} else if i.Debug {
+				log.Printf("Unable to send bad stat: %s", stat)
+			}
+		}
+	}
+
+	allPoints := strings.Join(points, "\n") + "\n"
+	_, err = fmt.Fprintf(i.conn, allPoints)
+
+	if i.Debug {
+		log.Println(allPoints)
+	}
+
+	if err != nil {
+		if err == io.EOF {
+			i.Close()
+		}
+
+		return err
+	}
+
+	return nil
+}
+
+func (i *Instrumental) Description() string {
+	return "Configuration for sending metrics to an Instrumental project"
+}
+
+func (i *Instrumental) SampleConfig() string {
+	return sampleConfig
+}
+
+func (i *Instrumental) authenticate(conn net.Conn) error {
+	_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
+	if err != nil {
+		return err
+	}
+
+	// The response here will either be two "ok"s or an error message.
+	responses := make([]byte, 512)
+	if _, err = conn.Read(responses); err != nil {
+		return err
+	}
+
+	if string(responses)[:6] != "ok\nok\n" {
+		return fmt.Errorf("Authentication failed: %s", responses)
+	}
+
+	i.conn = conn
+	return nil
+}
+
+func init() {
+	outputs.Add("instrumental", func() telegraf.Output {
+		return &Instrumental{
+			Host:     DefaultHost,
+			Template: graphite.DEFAULT_TEMPLATE,
+		}
+	})
+}
diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go
new file mode 100644
index 00000000..ceb53bac
--- /dev/null
+++ b/plugins/outputs/instrumental/instrumental_test.go
@@ -0,0 +1,114 @@
+package instrumental
+
+import (
+	"bufio"
+	"net"
+	"net/textproto"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestWrite(t *testing.T) {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go TCPServer(t, &wg)
+	// Give the fake TCP server some time to start:
+	time.Sleep(time.Millisecond * 100)
+
+	i := Instrumental{
+		Host:     "127.0.0.1",
+		ApiToken: "abc123token",
+		Prefix:   "my.prefix",
+	}
+	i.Connect()
+
+	// Default to gauge
+	m1, _ := telegraf.NewMetric(
+		"mymeasurement",
+		map[string]string{"host": "192.168.0.1"},
+		map[string]interface{}{"myfield": float64(3.14)},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+	m2, _ := telegraf.NewMetric(
+		"mymeasurement",
+		map[string]string{"host": "192.168.0.1", "metric_type": "set"},
+		map[string]interface{}{"value": float64(3.14)},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+
+	// Simulate a connection close and reconnect.
+	metrics := []telegraf.Metric{m1, m2}
+	i.Write(metrics)
+	i.Close()
+
+	// Counter and Histogram are increments
+	m3, _ := telegraf.NewMetric(
+		"my_histogram",
+		map[string]string{"host": "192.168.0.1", "metric_type": "histogram"},
+		map[string]interface{}{"value": float64(3.14)},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+	// We will drop metrics that simply won't be accepted by Instrumental
+	m4, _ := telegraf.NewMetric(
+		"bad_values",
+		map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
+		map[string]interface{}{"value": "\" 3:30\""},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+	m5, _ := telegraf.NewMetric(
+		"my_counter",
+		map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
+		map[string]interface{}{"value": float64(3.14)},
+		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
+	)
+
+	metrics = []telegraf.Metric{m3, m4, m5}
+	i.Write(metrics)
+
+	wg.Wait()
+	i.Close()
+}
+
+func TCPServer(t *testing.T, wg *sync.WaitGroup) {
+	tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
+	defer wg.Done()
+	conn, _ := tcpServer.Accept()
+	conn.SetDeadline(time.Now().Add(1 * time.Second))
+	reader := bufio.NewReader(conn)
+	tp := textproto.NewReader(reader)
+
+	hello, _ := tp.ReadLine()
+	assert.Equal(t, "hello version go/telegraf/1.0", hello)
+	auth, _ := tp.ReadLine()
+	assert.Equal(t, "authenticate abc123token", auth)
+
+	conn.Write([]byte("ok\nok\n"))
+
+	data1, _ := tp.ReadLine()
+	assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
+	data2, _ := tp.ReadLine()
+	assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
+
+	conn, _ = tcpServer.Accept()
+	conn.SetDeadline(time.Now().Add(1 * time.Second))
+	reader = bufio.NewReader(conn)
+	tp = textproto.NewReader(reader)
+
+	hello, _ = tp.ReadLine()
+	assert.Equal(t, "hello version go/telegraf/1.0", hello)
+	auth, _ = tp.ReadLine()
+	assert.Equal(t, "authenticate abc123token", auth)
+
+	conn.Write([]byte("ok\nok\n"))
+
+	data3, _ := tp.ReadLine()
+	assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
+	data4, _ := tp.ReadLine()
+	assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data4)
+
+	conn.Close()
+}
-- 
GitLab