From eb2a4dc724b31d8cdec41c9c9d9e8074ab9a16ce Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Thu, 24 Sep 2015 11:06:11 -0700
Subject: [PATCH] Statsd listener plugin

implement gauges, sets, counters
---
 CONTRIBUTING.md               |   2 -
 agent.go                      |  14 ++
 config.go                     |  52 +++--
 outputs/influxdb/influxdb.go  |  22 +-
 plugins/all/all.go            |   1 +
 plugins/registry.go           |  24 ++
 plugins/statsd/README.md      |  79 +++++++
 plugins/statsd/statsd.go      | 409 ++++++++++++++++++++++++++++++++++
 plugins/statsd/statsd_test.go |  11 +
 9 files changed, 584 insertions(+), 30 deletions(-)
 create mode 100644 plugins/statsd/README.md
 create mode 100644 plugins/statsd/statsd.go
 create mode 100644 plugins/statsd/statsd_test.go

diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index e8c38456..62f026fe 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -16,8 +16,6 @@ and submit new plugins.
 ### Plugin Guidelines
 
 * A plugin must conform to the `plugins.Plugin` interface.
-* Telegraf promises to run each plugin's Gather function serially. This means
-developers don't have to worry about thread safety within these functions.
 * Each generated metric automatically has the name of the plugin that generated
 it prepended. This is to keep plugins honest.
 * Plugins should call `plugins.Add` in their `init` function to register themselves.
diff --git a/agent.go b/agent.go
index 5af5696e..3610430f 100644
--- a/agent.go
+++ b/agent.go
@@ -361,6 +361,20 @@ func (a *Agent) Run(shutdown chan struct{}) error {
 	var wg sync.WaitGroup
 
 	for _, plugin := range a.plugins {
+
+		// Start service of any ServicePlugins
+		switch p := plugin.plugin.(type) {
+		case plugins.ServicePlugin:
+			if err := p.Start(); err != nil {
+				log.Printf("Service for plugin %s failed to start, exiting\n%s\n",
+					plugin.name, err.Error())
+				return err
+			}
+			defer p.Stop()
+		}
+
+		// Special handling for plugins that have their own collection interval
+		// configured. Default intervals are handled below with crankParallel
 		if plugin.config.Interval != 0 {
 			wg.Add(1)
 			go func(plugin *runningPlugin) {
diff --git a/config.go b/config.go
index d9724e65..99aa9d88 100644
--- a/config.go
+++ b/config.go
@@ -377,18 +377,25 @@ var header = `# Telegraf configuration
 [outputs]
 `
 
-var header2 = `
+var pluginHeader = `
 
 ###############################################################################
 #                                  PLUGINS                                    #
 ###############################################################################
 `
 
+var servicePluginHeader = `
+
+###############################################################################
+#                              SERVICE PLUGINS                                #
+###############################################################################
+`
+
 // PrintSampleConfig prints the sample config
 func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
 	fmt.Printf(header)
 
-	// Print Outputs
+	// Filter outputs
 	var onames []string
 	for oname := range outputs.Outputs {
 		if len(outputFilters) == 0 || sliceContains(oname, outputFilters) {
@@ -397,6 +404,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
 	}
 	sort.Strings(onames)
 
+	// Print Outputs
 	for _, oname := range onames {
 		creator := outputs.Outputs[oname]
 		output := creator()
@@ -411,9 +419,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
 		}
 	}
 
-	fmt.Printf(header2)
-
-	// Print Plugins
+	// Filter plugins
 	var pnames []string
 	for pname := range plugins.Plugins {
 		if len(pluginFilters) == 0 || sliceContains(pname, pluginFilters) {
@@ -422,18 +428,36 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
 	}
 	sort.Strings(pnames)
 
+	// Print Plugins
+	fmt.Printf(pluginHeader)
+	servPlugins := make(map[string]plugins.ServicePlugin)
 	for _, pname := range pnames {
 		creator := plugins.Plugins[pname]
 		plugin := creator()
 
-		fmt.Printf("\n# %s\n[%s]", plugin.Description(), pname)
-
-		config := plugin.SampleConfig()
-		if config == "" {
-			fmt.Printf("\n	# no configuration\n")
-		} else {
-			fmt.Printf(config)
+		switch p := plugin.(type) {
+		case plugins.ServicePlugin:
+			servPlugins[pname] = p
+			continue
 		}
+
+		printConfig(pname, plugin)
+	}
+
+	// Print Service Plugins
+	fmt.Printf(servicePluginHeader)
+	for name, plugin := range servPlugins {
+		printConfig(name, plugin)
+	}
+}
+
+func printConfig(name string, plugin plugins.Plugin) {
+	fmt.Printf("\n# %s\n[%s]", plugin.Description(), name)
+	config := plugin.SampleConfig()
+	if config == "" {
+		fmt.Printf("\n	# no configuration\n")
+	} else {
+		fmt.Printf(config)
 	}
 }
 
@@ -449,9 +473,7 @@ func sliceContains(name string, list []string) bool {
 // PrintPluginConfig prints the config usage of a single plugin.
 func PrintPluginConfig(name string) error {
 	if creator, ok := plugins.Plugins[name]; ok {
-		plugin := creator()
-		fmt.Printf("# %s\n[%s]", plugin.Description(), name)
-		fmt.Printf(plugin.SampleConfig())
+		printConfig(name, creator())
 	} else {
 		return errors.New(fmt.Sprintf("Plugin %s not found", name))
 	}
diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go
index c47b6cd8..04428849 100644
--- a/outputs/influxdb/influxdb.go
+++ b/outputs/influxdb/influxdb.go
@@ -28,22 +28,18 @@ type InfluxDB struct {
 
 var sampleConfig = `
 	# The full HTTP endpoint URL for your InfluxDB instance
-	# Multiple urls can be specified for InfluxDB cluster support. Server to
-	# write to will be randomly chosen each interval.
-	urls = ["http://localhost:8086"] # required.
-
-	# The target database for metrics. This database must already exist
-	database = "telegraf" # required.
-
-	# Connection timeout (for the connection with InfluxDB), formatted as a string.
-	# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
-	# If not provided, will default to 0 (no timeout)
+	# Multiple urls can be specified for InfluxDB cluster support.
+	urls = ["http://localhost:8086"] # required
+	# The target database for metrics (telegraf will create it if not exists)
+	database = "telegraf" # required
+
+	# # Connection timeout (for the connection with InfluxDB), formatted as a string.
+	# # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
+	# # If not provided, will default to 0 (no timeout)
 	# timeout = "5s"
-
 	# username = "telegraf"
 	# password = "metricsmetricsmetricsmetrics"
-
-	# Set the user agent for the POSTs (can be useful for log differentiation)
+	# # Set the user agent for the POSTs (can be useful for log differentiation)
 	# user_agent = "telegraf"
 `
 
diff --git a/plugins/all/all.go b/plugins/all/all.go
index db448074..4aca8d3d 100644
--- a/plugins/all/all.go
+++ b/plugins/all/all.go
@@ -22,6 +22,7 @@ import (
 	_ "github.com/influxdb/telegraf/plugins/rabbitmq"
 	_ "github.com/influxdb/telegraf/plugins/redis"
 	_ "github.com/influxdb/telegraf/plugins/rethinkdb"
+	_ "github.com/influxdb/telegraf/plugins/statsd"
 	_ "github.com/influxdb/telegraf/plugins/system"
 	_ "github.com/influxdb/telegraf/plugins/zookeeper"
 )
diff --git a/plugins/registry.go b/plugins/registry.go
index b6c9b798..88a24097 100644
--- a/plugins/registry.go
+++ b/plugins/registry.go
@@ -20,11 +20,35 @@ type Accumulator interface {
 }
 
 type Plugin interface {
+	// SampleConfig returns the default configuration of the Plugin
 	SampleConfig() string
+
+	// Description returns a one-sentence description on the Plugin
 	Description() string
+
+	// Gather takes in an accumulator and adds the metrics that the Plugin
+	// gathers. This is called every "interval"
 	Gather(Accumulator) error
 }
 
+type ServicePlugin interface {
+	// SampleConfig returns the default configuration of the Plugin
+	SampleConfig() string
+
+	// Description returns a one-sentence description on the Plugin
+	Description() string
+
+	// Gather takes in an accumulator and adds the metrics that the Plugin
+	// gathers. This is called every "interval"
+	Gather(Accumulator) error
+
+	// Start starts the ServicePlugin's service, whatever that may be
+	Start() error
+
+	// Stop stops the services and closes any necessary channels and connections
+	Stop()
+}
+
 type Creator func() Plugin
 
 var Plugins = map[string]Creator{}
diff --git a/plugins/statsd/README.md b/plugins/statsd/README.md
new file mode 100644
index 00000000..067d4cee
--- /dev/null
+++ b/plugins/statsd/README.md
@@ -0,0 +1,79 @@
+# Telegraf Service Plugin: statsd
+
+#### Plugin arguments:
+
+- **service_address** string: Address to listen for statsd UDP packets on
+- **delete_gauges** boolean: Delete gauges on every collection interval
+- **delete_counters** boolean: Delete counters on every collection interval
+- **delete_sets** boolean: Delete set counters on every collection interval
+- **allowed_pending_messages** integer: Number of messages allowed to queue up
+on the UDP listener before the next flush. NOTE: gauge, counter, and set
+measurements are aggregated as they arrive, so this is not a straight counter of
+the number of total messages that the listener can handle between flushes.
+
+#### Statsd bucket -> InfluxDB Mapping
+
+By default, statsd buckets are converted to measurement names with the rules:
+- "." -> "_"
+- "-" -> "__"
+
+This plugin also accepts a list of config tables to describe a mapping of a statsd
+bucket to an InfluxDB measurement name and tags.
+
+Each mapping must specify a match glob pattern. It can optionally take a name
+for the measurement and a map of bucket indices to tag names.
+
+For example, the following configuration:
+
+```
+    [[statsd.mappings]]
+    match = "users.current.*.*"
+    name = "current_users"
+    [statsd.mappings.tagmap]
+    unit = 0
+    server = 2
+    service = 3
+
+    [[statsd.mappings]]
+    match = "deploys.*.*"
+    name = "service_deploys"
+    [statsd.mappings.tagmap]
+    service_type = 1
+    service_name = 2
+```
+
+Will map statsd -> influx like so:
+```
+users.current.den001.myapp:32|g
+=> [server="den001" service="myapp" unit="users"] statsd_current_users_gauge value=32
+
+deploys.test.myservice:1|c
+=> [service_name="myservice" service_type="test"] statsd_service_deploys_counter value=1
+
+random.jumping-sheep:10|c
+=> [] statsd_random_jumping__sheep_counter value=10
+```
+
+#### Description
+
+The statsd plugin is a special type of plugin which runs a backgrounded statsd
+listener service while telegraf is running.
+
+The format of the statsd messages was based on the format described in the
+original [etsy statsd](https://github.com/etsy/statsd/blob/master/docs/metric_types.md)
+implementation. In short, the telegraf statsd listener will accept:
+
+- Gauges
+    - `users.current.den001.myapp:32|g` <- standard
+    - `users.current.den001.myapp:+10|g` <- additive
+    - `users.current.den001.myapp:-10|g`
+- Counters
+    - `deploys.test.myservice:1|c` <- increments by 1
+    - `deploys.test.myservice:101|c` <- increments by 101
+    - `deploys.test.myservice:1|c|@0.1` <- sample rate, increments by 10
+- Sets
+    - `users.unique:101|s`
+    - `users.unique:101|s`
+    - `users.unique:102|s` <- would result in a count of 2 for `users.unique`
+- Timers
+    - TODO
diff --git a/plugins/statsd/statsd.go b/plugins/statsd/statsd.go
new file mode 100644
index 00000000..454ad33d
--- /dev/null
+++ b/plugins/statsd/statsd.go
@@ -0,0 +1,409 @@
+package statsd
+
+import (
+	"log"
+	"net"
+	"strconv"
+	"strings"
+	"sync"
+
+	"github.com/influxdb/telegraf/plugins"
+)
+
+var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
+	"You may want to increase allowed_pending_messages in the config\n"
+
+type Statsd struct {
+	// Address & Port to serve from
+	ServiceAddress string
+
+	// Number of messages allowed to queue up in between calls to Gather. If this
+	// fills up, packets will get dropped until the next Gather interval is ran.
+	AllowedPendingMessages int
+
+	DeleteGauges   bool
+	DeleteCounters bool
+	DeleteSets     bool
+
+	sync.Mutex
+
+	// Channel for all incoming statsd messages
+	in        chan string
+	inmetrics chan metric
+	done      chan struct{}
+
+	// Cache gauges, counters & sets so they can be aggregated as they arrive
+	gauges   map[string]cachedmetric
+	counters map[string]cachedmetric
+	sets     map[string]cachedmetric
+
+	Mappings []struct {
+		Match  string
+		Name   string
+		Tagmap map[string]int
+	}
+}
+
+// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
+type metric struct {
+	name       string
+	bucket     string
+	value      int64
+	mtype      string
+	additive   bool
+	samplerate float64
+	tags       map[string]string
+}
+
+// cachedmetric is a subset of metric used specifically for storing cached
+// gauges and counters, ready for sending to InfluxDB.
+type cachedmetric struct {
+	value int64
+	tags  map[string]string
+	set   map[int64]bool
+}
+
+func (_ *Statsd) Description() string {
+	return "Statsd listener"
+}
+
+const sampleConfig = `
+    # Address and port to host UDP listener on
+    service_address = ":8125"
+    # Delete gauges every interval
+    delete_gauges = false
+    # Delete counters every interval
+    delete_counters = false
+    # Delete sets every interval
+    delete_sets = false
+
+    # Number of messages allowed to queue up, once filled,
+    # the statsd server will start dropping packets
+    allowed_pending_messages = 10000
+`
+
+func (_ *Statsd) SampleConfig() string {
+	return sampleConfig
+}
+
+func (s *Statsd) Gather(acc plugins.Accumulator) error {
+	s.Lock()
+	defer s.Unlock()
+
+	values := make(map[string]int64)
+	items := len(s.inmetrics)
+	for i := 0; i < items; i++ {
+
+		m := <-s.inmetrics
+
+		switch m.mtype {
+		case "c", "g", "s":
+			log.Println("ERROR: Uh oh, this should not have happened")
+		case "ms", "h":
+			// TODO
+		}
+	}
+
+	for name, cmetric := range s.gauges {
+		acc.Add(name, cmetric.value, cmetric.tags)
+	}
+	if s.DeleteGauges {
+		s.gauges = make(map[string]cachedmetric)
+	}
+
+	for name, cmetric := range s.counters {
+		acc.Add(name, cmetric.value, cmetric.tags)
+	}
+	if s.DeleteCounters {
+		s.counters = make(map[string]cachedmetric)
+	}
+
+	for name, cmetric := range s.sets {
+		acc.Add(name, cmetric.value, cmetric.tags)
+	}
+	if s.DeleteSets {
+		s.sets = make(map[string]cachedmetric)
+	}
+
+	for name, value := range values {
+		acc.Add(name, value, nil)
+	}
+	return nil
+}
+
+func (s *Statsd) Start() error {
+	log.Println("Starting up the statsd service")
+
+	// Make data structures
+	s.done = make(chan struct{})
+	s.in = make(chan string, s.AllowedPendingMessages)
+	s.inmetrics = make(chan metric, s.AllowedPendingMessages)
+	s.gauges = make(map[string]cachedmetric)
+	s.counters = make(map[string]cachedmetric)
+	s.sets = make(map[string]cachedmetric)
+
+	// Start the UDP listener
+	go s.udpListen()
+	// Start the line parser
+	go s.parser()
+	return nil
+}
+
+// udpListen starts listening for udp packets on the configured port.
+func (s *Statsd) udpListen() error {
+	address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
+	listener, err := net.ListenUDP("udp", address)
+	if err != nil {
+		log.Fatalf("ERROR: ListenUDP - %s", err)
+	}
+	defer listener.Close()
+	log.Println("Statsd listener listening on: ", listener.LocalAddr().String())
+
+	for {
+		select {
+		case <-s.done:
+			return nil
+		default:
+			buf := make([]byte, 1024)
+			n, _, err := listener.ReadFromUDP(buf)
+			if err != nil {
+				log.Printf("ERROR: %s\n", err.Error())
+			}
+
+			lines := strings.Split(string(buf[:n]), "\n")
+			for _, line := range lines {
+				line = strings.TrimSpace(line)
+				if line != "" {
+					select {
+					case s.in <- line:
+					default:
+						log.Printf(dropwarn, line)
+					}
+				}
+			}
+		}
+	}
+}
+
+// parser monitors the s.in channel, if there is a line ready, it parses the
+// statsd string into a usable metric struct and either aggregates the value
+// or pushes it into the s.inmetrics channel.
+func (s *Statsd) parser() error {
+	for {
+		select {
+		case <-s.done:
+			return nil
+		case line := <-s.in:
+			s.parseStatsdLine(line)
+		}
+	}
+}
+
+// parseStatsdLine will parse the given statsd line, validating it as it goes.
+// If the line is valid, it will be cached for the next call to Gather()
+func (s *Statsd) parseStatsdLine(line string) {
+	s.Lock()
+	defer s.Unlock()
+
+	// Validate splitting the line on "|"
+	m := metric{}
+	parts1 := strings.Split(line, "|")
+	if len(parts1) < 2 {
+		log.Printf("Error splitting '|', Unable to parse metric: %s\n", line)
+		return
+	} else if len(parts1) > 2 {
+		sr := parts1[2]
+		if strings.Contains(sr, "@") && len(sr) > 1 {
+			samplerate, err := strconv.ParseFloat(sr[1:], 64)
+			if err != nil {
+				log.Printf("Error parsing sample rate: %s\n", err.Error())
+			} else {
+				m.samplerate = samplerate
+			}
+		} else {
+			msg := "Error parsing sample rate, it must be in format like: " +
+				"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
+			log.Printf(msg, line)
+		}
+	}
+
+	// Validate metric type
+	switch parts1[1] {
+	case "g", "c", "s", "ms", "h":
+		m.mtype = parts1[1]
+	default:
+		log.Printf("Statsd Metric type %s unsupported", parts1[1])
+		return
+	}
+
+	// Validate splitting the rest of the line on ":"
+	parts2 := strings.Split(parts1[0], ":")
+	if len(parts2) != 2 {
+		log.Printf("Error splitting ':', Unable to parse metric: %s\n", line)
+		return
+	}
+	m.bucket = parts2[0]
+
+	// Parse the value
+	if strings.ContainsAny(parts2[1], "-+") {
+		if m.mtype != "g" {
+			log.Printf("Error: +- values are only supported for gauges: %s\n", line)
+			return
+		}
+		m.additive = true
+	}
+	v, err := strconv.ParseInt(parts2[1], 10, 64)
+	if err != nil {
+		log.Printf("Error: parsing value to int64: %s\n", line)
+		return
+	}
+	// If a sample rate is given with a counter, divide value by the rate
+	if m.samplerate != 0 && m.mtype == "c" {
+		v = int64(float64(v) / m.samplerate)
+	}
+	m.value = v
+
+	// Parse the name
+	m.name, m.tags = s.parseName(m)
+
+	switch m.mtype {
+	// Aggregate gauges, counters and sets as we go
+	case "g", "c", "s":
+		s.aggregate(m)
+	// Timers get processed at flush time
+	default:
+		select {
+		case s.inmetrics <- m:
+		default:
+			log.Printf(dropwarn, line)
+		}
+	}
+}
+
+// parseName parses the given bucket name with the list of bucket maps in the
+// config file. If there is a match, it will parse the name of the metric and
+// map of tags.
+// Return values are (<name>, <tags>)
+func (s *Statsd) parseName(m metric) (string, map[string]string) {
+	var tags map[string]string
+	name := strings.Replace(m.bucket, ".", "_", -1)
+	name = strings.Replace(name, "-", "__", -1)
+
+	for _, bm := range s.Mappings {
+		if bucketglob(bm.Match, m.bucket) {
+			tags = make(map[string]string)
+			bparts := strings.Split(m.bucket, ".")
+			for name, index := range bm.Tagmap {
+				if index >= len(bparts) {
+					log.Printf("ERROR: Index %d out of range for bucket %s\n",
+						index, m.bucket)
+					continue
+				}
+				tags[name] = bparts[index]
+			}
+			if bm.Name != "" {
+				name = bm.Name
+			}
+		}
+	}
+
+	switch m.mtype {
+	case "c":
+		name = name + "_counter"
+	case "g":
+		name = name + "_gauge"
+	case "s":
+		name = name + "_set"
+	case "ms", "h":
+		name = name + "_timer"
+	}
+
+	return name, tags
+}
+
+func bucketglob(pattern, bucket string) bool {
+	pparts := strings.Split(pattern, ".")
+	bparts := strings.Split(bucket, ".")
+	if len(pparts) != len(bparts) {
+		return false
+	}
+
+	for i, _ := range pparts {
+		if pparts[i] == "*" || pparts[i] == bparts[i] {
+			continue
+		} else {
+			return false
+		}
+	}
+	return true
+}
+
+// aggregate takes in a metric of type "counter", "gauge", or "set". It then
+// aggregates and caches the current value. It does not deal with the
+// DeleteCounters, DeleteGauges or DeleteSets options, because those are dealt
+// with in the Gather function.
+func (s *Statsd) aggregate(m metric) {
+	switch m.mtype {
+	case "c":
+		cached, ok := s.counters[m.name]
+		if !ok {
+			s.counters[m.name] = cachedmetric{
+				value: m.value,
+				tags:  m.tags,
+			}
+		} else {
+			cached.value += m.value
+			cached.tags = m.tags
+			s.counters[m.name] = cached
+		}
+	case "g":
+		cached, ok := s.gauges[m.name]
+		if !ok {
+			s.gauges[m.name] = cachedmetric{
+				value: m.value,
+				tags:  m.tags,
+			}
+		} else {
+			if m.additive {
+				cached.value = cached.value + m.value
+			} else {
+				cached.value = m.value
+			}
+			cached.tags = m.tags
+			s.gauges[m.name] = cached
+		}
+	case "s":
+		cached, ok := s.sets[m.name]
+		if !ok {
+			// Completely new metric (initialize with count of 1)
+			s.sets[m.name] = cachedmetric{
+				value: 1,
+				tags:  m.tags,
+				set:   map[int64]bool{m.value: true},
+			}
+		} else {
+			_, ok := s.sets[m.name].set[m.value]
+			if !ok {
+				// Metric exists, but value has not been counted
+				cached.value += 1
+				cached.set[m.value] = true
+				s.sets[m.name] = cached
+			}
+		}
+	}
+}
+
+func (s *Statsd) Stop() {
+	s.Lock()
+	defer s.Unlock()
+	log.Println("Stopping the statsd service")
+	close(s.done)
+	close(s.in)
+	close(s.inmetrics)
+}
+
+func init() {
+	plugins.Add("statsd", func() plugins.Plugin {
+		return &Statsd{}
+	})
+}
diff --git a/plugins/statsd/statsd_test.go b/plugins/statsd/statsd_test.go
new file mode 100644
index 00000000..159192c4
--- /dev/null
+++ b/plugins/statsd/statsd_test.go
@@ -0,0 +1,11 @@
+package statsd
+
+import (
+	"testing"
+)
+
+func TestListen(t *testing.T) {
+	if false {
+		t.Errorf("Test failed!")
+	}
+}
-- 
GitLab