From 2c98e5ae66127c87d44f5a2ac5b819879a4b76e5 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <danielnelson@users.noreply.github.com>
Date: Wed, 12 Apr 2017 10:41:26 -0700
Subject: [PATCH] Add collectd parser (#2654)

---
 CHANGELOG.md                                  |   1 +
 Godeps                                        |   1 +
 README.md                                     |  10 +
 docs/DATA_FORMATS_INPUT.md                    |  41 +++
 docs/LICENSE_OF_DEPENDENCIES.md               |   2 +-
 internal/config/config.go                     |  31 ++
 logger/logger.go                              |  11 +-
 logger/logger_test.go                         |  13 +
 .../inputs/socket_listener/socket_listener.go |   4 +-
 plugins/parsers/collectd/parser.go            | 165 ++++++++++
 plugins/parsers/collectd/parser_test.go       | 298 ++++++++++++++++++
 plugins/parsers/registry.go                   |  19 ++
 12 files changed, 592 insertions(+), 4 deletions(-)
 create mode 100644 plugins/parsers/collectd/parser.go
 create mode 100644 plugins/parsers/collectd/parser_test.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 10934f7f..a2d9fc68 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -67,6 +67,7 @@ be deprecated eventually.
 - [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
 - [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener
 - [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input
+- [#1100](https://github.com/influxdata/telegraf/issues/1100): Add collectd parser
 
 ### Bugfixes
 
diff --git a/Godeps b/Godeps
index 2e04c0cd..a41d028c 100644
--- a/Godeps
+++ b/Godeps
@@ -1,3 +1,4 @@
+collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
 github.com/Shopify/sarama 574d3147eee384229bf96a5d12c207fe7b5234f3
 github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
 github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
diff --git a/README.md b/README.md
index 55154e36..f46c2e29 100644
--- a/README.md
+++ b/README.md
@@ -195,6 +195,16 @@ Telegraf can also collect metrics via the following service plugins:
   * [mandrill](./plugins/inputs/webhooks/mandrill)
   * [rollbar](./plugins/inputs/webhooks/rollbar)
 
+Telegraf is able to parse the following input data formats into metrics, these
+formats may be used with input plugins supporting the `data_format` option:
+
+* [InfluxDB Line Protocol](./docs/DATA_FORMATS_INPUT.md#influx)
+* [JSON](./docs/DATA_FORMATS_INPUT.md#json)
+* [Graphite](./docs/DATA_FORMATS_INPUT.md#graphite)
+* [Value](./docs/DATA_FORMATS_INPUT.md#value)
+* [Nagios](./docs/DATA_FORMATS_INPUT.md#nagios)
+* [Collectd](./docs/DATA_FORMATS_INPUT.md#collectd)
+
 ## Processor Plugins
 
 * [printer](./plugins/processors/printer)
diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md
index f2a635d8..59287e4a 100644
--- a/docs/DATA_FORMATS_INPUT.md
+++ b/docs/DATA_FORMATS_INPUT.md
@@ -7,6 +7,7 @@ Telegraf is able to parse the following input data formats into metrics:
 1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
 1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
 1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
+1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
 
 Telegraf metrics, like InfluxDB
 [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@@ -438,3 +439,43 @@ Note: Nagios Input Data Formats is only supported in `exec` input plugin.
   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
   data_format = "nagios"
 ```
+
+# Collectd:
+
+The collectd format parses the collectd binary network protocol.  Tags are
+created for host, instance, type, and type instance.  All collectd values are
+added as float64 fields.
+
+For more information about the binary network protocol see
+[here](https://collectd.org/wiki/index.php/Binary_protocol).
+
+You can control the cryptographic settings with parser options.  Create an
+authentication file and set `collectd_auth_file` to the path of the file, then
+set the desired security level in `collectd_security_level`.
+
+Additional information including client setup can be found
+[here](https://collectd.org/wiki/index.php/Networking_introduction#Cryptographic_setup).
+
+You can also change the path to the typesdb or add additional typesdb using
+`collectd_typesdb`.
+
+#### Collectd Configuration:
+
+```toml
+[[inputs.socket_listener]]
+  service_address = "udp://127.0.0.1:25826"
+  name_prefix = "collectd_"
+
+  ## Data format to consume.
+  ## Each data format has it's own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+  data_format = "collectd"
+
+  ## Authentication file for cryptographic security levels
+  collectd_auth_file = "/etc/collectd/auth_file"
+  ## One of none (default), sign, or encrypt
+  collectd_security_level = "encrypt"
+  ## Path of to TypesDB specifications
+  collectd_typesdb = ["/usr/share/collectd/types.db"]
+```
diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md
index 5bb1bd03..a367aa7f 100644
--- a/docs/LICENSE_OF_DEPENDENCIES.md
+++ b/docs/LICENSE_OF_DEPENDENCIES.md
@@ -1,4 +1,5 @@
 # List
+- collectd.org [MIT LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
 - github.com/Shopify/sarama [MIT LICENSE](https://github.com/Shopify/sarama/blob/master/MIT-LICENSE)
 - github.com/Sirupsen/logrus [MIT LICENSE](https://github.com/Sirupsen/logrus/blob/master/LICENSE)
 - github.com/armon/go-metrics [MIT LICENSE](https://github.com/armon/go-metrics/blob/master/LICENSE)
@@ -30,4 +31,3 @@
 - gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE)
 - gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE)
 - golang.org/x/crypto/ [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
-
diff --git a/internal/config/config.go b/internal/config/config.go
index 013e81c1..f8c30417 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -1230,6 +1230,34 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
 		}
 	}
 
+	if node, ok := tbl.Fields["collectd_auth_file"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if str, ok := kv.Value.(*ast.String); ok {
+				c.CollectdAuthFile = str.Value
+			}
+		}
+	}
+
+	if node, ok := tbl.Fields["collectd_security_level"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if str, ok := kv.Value.(*ast.String); ok {
+				c.CollectdSecurityLevel = str.Value
+			}
+		}
+	}
+
+	if node, ok := tbl.Fields["collectd_typesdb"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if ary, ok := kv.Value.(*ast.Array); ok {
+				for _, elem := range ary.Value {
+					if str, ok := elem.(*ast.String); ok {
+						c.CollectdTypesDB = append(c.CollectdTypesDB, str.Value)
+					}
+				}
+			}
+		}
+	}
+
 	c.MetricName = name
 
 	delete(tbl.Fields, "data_format")
@@ -1237,6 +1265,9 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
 	delete(tbl.Fields, "templates")
 	delete(tbl.Fields, "tag_keys")
 	delete(tbl.Fields, "data_type")
+	delete(tbl.Fields, "collectd_auth_file")
+	delete(tbl.Fields, "collectd_security_level")
+	delete(tbl.Fields, "collectd_typesdb")
 
 	return parsers.NewParser(c)
 }
diff --git a/logger/logger.go b/logger/logger.go
index 49613c27..7ad1c806 100644
--- a/logger/logger.go
+++ b/logger/logger.go
@@ -4,11 +4,14 @@ import (
 	"io"
 	"log"
 	"os"
+	"regexp"
 	"time"
 
 	"github.com/influxdata/wlog"
 )
 
+var prefixRegex = regexp.MustCompile("^[DIWE]!")
+
 // newTelegrafWriter returns a logging-wrapped writer.
 func newTelegrafWriter(w io.Writer) io.Writer {
 	return &telegrafLog{
@@ -21,7 +24,13 @@ type telegrafLog struct {
 }
 
 func (t *telegrafLog) Write(b []byte) (n int, err error) {
-	return t.writer.Write(append([]byte(time.Now().UTC().Format(time.RFC3339)+" "), b...))
+	var line []byte
+	if !prefixRegex.Match(b) {
+		line = append([]byte(time.Now().UTC().Format(time.RFC3339)+" I! "), b...)
+	} else {
+		line = append([]byte(time.Now().UTC().Format(time.RFC3339)+" "), b...)
+	}
+	return t.writer.Write(line)
 }
 
 // SetupLogging configures the logging output.
diff --git a/logger/logger_test.go b/logger/logger_test.go
index 8c0826e6..09c7c82e 100644
--- a/logger/logger_test.go
+++ b/logger/logger_test.go
@@ -51,6 +51,19 @@ func TestErrorWriteLogToFile(t *testing.T) {
 	assert.Equal(t, f[19:], []byte("Z E! TEST\n"))
 }
 
+func TestAddDefaultLogLevel(t *testing.T) {
+	tmpfile, err := ioutil.TempFile("", "")
+	assert.NoError(t, err)
+	defer func() { os.Remove(tmpfile.Name()) }()
+
+	SetupLogging(true, false, tmpfile.Name())
+	log.Printf("TEST")
+
+	f, err := ioutil.ReadFile(tmpfile.Name())
+	assert.NoError(t, err)
+	assert.Equal(t, f[19:], []byte("Z I! TEST\n"))
+}
+
 func BenchmarkTelegrafLogWrite(b *testing.B) {
 	var msg = []byte("test")
 	var buf bytes.Buffer
diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go
index b5c0202c..4a9a470a 100644
--- a/plugins/inputs/socket_listener/socket_listener.go
+++ b/plugins/inputs/socket_listener/socket_listener.go
@@ -71,7 +71,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
 	for scnr.Scan() {
 		metrics, err := ssl.Parse(scnr.Bytes())
 		if err != nil {
-			ssl.AddError(fmt.Errorf("unable to parse incoming line"))
+			ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err))
 			//TODO rate limit
 			continue
 		}
@@ -105,7 +105,7 @@ func (psl *packetSocketListener) listen() {
 
 		metrics, err := psl.Parse(buf[:n])
 		if err != nil {
-			psl.AddError(fmt.Errorf("unable to parse incoming packet"))
+			psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
 			//TODO rate limit
 			continue
 		}
diff --git a/plugins/parsers/collectd/parser.go b/plugins/parsers/collectd/parser.go
new file mode 100644
index 00000000..20525610
--- /dev/null
+++ b/plugins/parsers/collectd/parser.go
@@ -0,0 +1,165 @@
+package collectd
+
+import (
+	"errors"
+	"fmt"
+	"log"
+	"os"
+
+	"collectd.org/api"
+	"collectd.org/network"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/metric"
+)
+
+const (
+	DefaultAuthFile = "/etc/collectd/auth_file"
+)
+
+type CollectdParser struct {
+	// DefaultTags will be added to every parsed metric
+	DefaultTags map[string]string
+
+	popts network.ParseOpts
+}
+
+func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) {
+	p.popts = *popts
+}
+
+func NewCollectdParser(
+	authFile string,
+	securityLevel string,
+	typesDB []string,
+) (*CollectdParser, error) {
+	popts := network.ParseOpts{}
+
+	switch securityLevel {
+	case "none":
+		popts.SecurityLevel = network.None
+	case "sign":
+		popts.SecurityLevel = network.Sign
+	case "encrypt":
+		popts.SecurityLevel = network.Encrypt
+	default:
+		popts.SecurityLevel = network.None
+	}
+
+	if authFile == "" {
+		authFile = DefaultAuthFile
+	}
+	popts.PasswordLookup = network.NewAuthFile(authFile)
+
+	for _, path := range typesDB {
+		db, err := LoadTypesDB(path)
+		if err != nil {
+			return nil, err
+		}
+
+		if popts.TypesDB != nil {
+			popts.TypesDB.Merge(db)
+		} else {
+			popts.TypesDB = db
+		}
+	}
+
+	parser := CollectdParser{popts: popts}
+	return &parser, nil
+}
+
+func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {
+	valueLists, err := network.Parse(buf, p.popts)
+	if err != nil {
+		return nil, fmt.Errorf("Collectd parser error: %s", err)
+	}
+
+	metrics := []telegraf.Metric{}
+	for _, valueList := range valueLists {
+		metrics = append(metrics, UnmarshalValueList(valueList)...)
+	}
+
+	if len(p.DefaultTags) > 0 {
+		for _, m := range metrics {
+			for k, v := range p.DefaultTags {
+				// only set the default tag if it doesn't already exist:
+				if !m.HasTag(k) {
+					m.AddTag(k, v)
+				}
+			}
+		}
+	}
+
+	return metrics, nil
+}
+
+func (p *CollectdParser) ParseLine(line string) (telegraf.Metric, error) {
+	metrics, err := p.Parse([]byte(line))
+	if err != nil {
+		return nil, err
+	}
+
+	if len(metrics) != 1 {
+		return nil, errors.New("Line contains multiple metrics")
+	}
+
+	return metrics[0], nil
+}
+
+func (p *CollectdParser) SetDefaultTags(tags map[string]string) {
+	p.DefaultTags = tags
+}
+
+// UnmarshalValueList translates a ValueList into a Telegraf metric.
+func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric {
+	timestamp := vl.Time.UTC()
+
+	var metrics []telegraf.Metric
+	for i := range vl.Values {
+		var name string
+		name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
+		tags := make(map[string]string)
+		fields := make(map[string]interface{})
+
+		// Convert interface back to actual type, then to float64
+		switch value := vl.Values[i].(type) {
+		case api.Gauge:
+			fields["value"] = float64(value)
+		case api.Derive:
+			fields["value"] = float64(value)
+		case api.Counter:
+			fields["value"] = float64(value)
+		}
+
+		if vl.Identifier.Host != "" {
+			tags["host"] = vl.Identifier.Host
+		}
+		if vl.Identifier.PluginInstance != "" {
+			tags["instance"] = vl.Identifier.PluginInstance
+		}
+		if vl.Identifier.Type != "" {
+			tags["type"] = vl.Identifier.Type
+		}
+		if vl.Identifier.TypeInstance != "" {
+			tags["type_instance"] = vl.Identifier.TypeInstance
+		}
+
+		// Drop invalid points
+		m, err := metric.New(name, tags, fields, timestamp)
+		if err != nil {
+			log.Printf("E! Dropping metric %v: %v", name, err)
+			continue
+		}
+
+		metrics = append(metrics, m)
+	}
+	return metrics
+}
+
+func LoadTypesDB(path string) (*api.TypesDB, error) {
+	reader, err := os.Open(path)
+	if err != nil {
+		return nil, err
+	}
+	return api.NewTypesDB(reader)
+}
diff --git a/plugins/parsers/collectd/parser_test.go b/plugins/parsers/collectd/parser_test.go
new file mode 100644
index 00000000..3aad0401
--- /dev/null
+++ b/plugins/parsers/collectd/parser_test.go
@@ -0,0 +1,298 @@
+package collectd
+
+import (
+	"context"
+	"testing"
+
+	"collectd.org/api"
+	"collectd.org/network"
+	"github.com/stretchr/testify/require"
+
+	"github.com/influxdata/telegraf"
+)
+
+type AuthMap struct {
+	Passwd map[string]string
+}
+
+func (p *AuthMap) Password(user string) (string, error) {
+	return p.Passwd[user], nil
+}
+
+type metricData struct {
+	name   string
+	tags   map[string]string
+	fields map[string]interface{}
+}
+
+type testCase struct {
+	vl       []api.ValueList
+	expected []metricData
+}
+
+var singleMetric = testCase{
+	[]api.ValueList{
+		api.ValueList{
+			Identifier: api.Identifier{
+				Host:           "xyzzy",
+				Plugin:         "cpu",
+				PluginInstance: "1",
+				Type:           "cpu",
+				TypeInstance:   "user",
+			},
+			Values: []api.Value{
+				api.Counter(42),
+			},
+			DSNames: []string(nil),
+		},
+	},
+	[]metricData{
+		metricData{
+			"cpu_value",
+			map[string]string{
+				"type_instance": "user",
+				"host":          "xyzzy",
+				"instance":      "1",
+				"type":          "cpu",
+			},
+			map[string]interface{}{
+				"value": float64(42),
+			},
+		},
+	},
+}
+
+var multiMetric = testCase{
+	[]api.ValueList{
+		api.ValueList{
+			Identifier: api.Identifier{
+				Host:           "xyzzy",
+				Plugin:         "cpu",
+				PluginInstance: "0",
+				Type:           "cpu",
+				TypeInstance:   "user",
+			},
+			Values: []api.Value{
+				api.Derive(42),
+				api.Gauge(42),
+			},
+			DSNames: []string(nil),
+		},
+	},
+	[]metricData{
+		metricData{
+			"cpu_0",
+			map[string]string{
+				"type_instance": "user",
+				"host":          "xyzzy",
+				"instance":      "0",
+				"type":          "cpu",
+			},
+			map[string]interface{}{
+				"value": float64(42),
+			},
+		},
+		metricData{
+			"cpu_1",
+			map[string]string{
+				"type_instance": "user",
+				"host":          "xyzzy",
+				"instance":      "0",
+				"type":          "cpu",
+			},
+			map[string]interface{}{
+				"value": float64(42),
+			},
+		},
+	},
+}
+
+func TestNewCollectdParser(t *testing.T) {
+	parser, err := NewCollectdParser("", "", []string{})
+	require.Nil(t, err)
+	require.Equal(t, parser.popts.SecurityLevel, network.None)
+	require.NotNil(t, parser.popts.PasswordLookup)
+	require.Nil(t, parser.popts.TypesDB)
+}
+
+func TestParse(t *testing.T) {
+	cases := []testCase{singleMetric, multiMetric}
+
+	for _, tc := range cases {
+		buf, err := writeValueList(tc.vl)
+		require.Nil(t, err)
+		bytes, err := buf.Bytes()
+		require.Nil(t, err)
+
+		parser := &CollectdParser{}
+		require.Nil(t, err)
+		metrics, err := parser.Parse(bytes)
+		require.Nil(t, err)
+
+		assertEqualMetrics(t, tc.expected, metrics)
+	}
+}
+
+func TestParse_DefaultTags(t *testing.T) {
+	buf, err := writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	bytes, err := buf.Bytes()
+	require.Nil(t, err)
+
+	parser := &CollectdParser{}
+	parser.SetDefaultTags(map[string]string{
+		"foo": "bar",
+	})
+	require.Nil(t, err)
+	metrics, err := parser.Parse(bytes)
+	require.Nil(t, err)
+
+	require.Equal(t, "bar", metrics[0].Tags()["foo"])
+}
+
+func TestParse_SignSecurityLevel(t *testing.T) {
+	parser := &CollectdParser{}
+	popts := &network.ParseOpts{
+		SecurityLevel: network.Sign,
+		PasswordLookup: &AuthMap{
+			map[string]string{
+				"user0": "bar",
+			},
+		},
+	}
+	parser.SetParseOpts(popts)
+
+	// Signed data
+	buf, err := writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	buf.Sign("user0", "bar")
+	bytes, err := buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err := parser.Parse(bytes)
+	require.Nil(t, err)
+	assertEqualMetrics(t, singleMetric.expected, metrics)
+
+	// Encrypted data
+	buf, err = writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	buf.Encrypt("user0", "bar")
+	bytes, err = buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err = parser.Parse(bytes)
+	require.Nil(t, err)
+	assertEqualMetrics(t, singleMetric.expected, metrics)
+
+	// Plain text data skipped
+	buf, err = writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	bytes, err = buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err = parser.Parse(bytes)
+	require.Nil(t, err)
+	require.Equal(t, []telegraf.Metric{}, metrics)
+
+	// Wrong password error
+	buf, err = writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	buf.Sign("x", "y")
+	bytes, err = buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err = parser.Parse(bytes)
+	require.NotNil(t, err)
+}
+
+func TestParse_EncryptSecurityLevel(t *testing.T) {
+	parser := &CollectdParser{}
+	popts := &network.ParseOpts{
+		SecurityLevel: network.Encrypt,
+		PasswordLookup: &AuthMap{
+			map[string]string{
+				"user0": "bar",
+			},
+		},
+	}
+	parser.SetParseOpts(popts)
+
+	// Signed data skipped
+	buf, err := writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	buf.Sign("user0", "bar")
+	bytes, err := buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err := parser.Parse(bytes)
+	require.Nil(t, err)
+	require.Equal(t, []telegraf.Metric{}, metrics)
+
+	// Encrypted data
+	buf, err = writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	buf.Encrypt("user0", "bar")
+	bytes, err = buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err = parser.Parse(bytes)
+	require.Nil(t, err)
+	assertEqualMetrics(t, singleMetric.expected, metrics)
+
+	// Plain text data skipped
+	buf, err = writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	bytes, err = buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err = parser.Parse(bytes)
+	require.Nil(t, err)
+	require.Equal(t, []telegraf.Metric{}, metrics)
+
+	// Wrong password error
+	buf, err = writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	buf.Sign("x", "y")
+	bytes, err = buf.Bytes()
+	require.Nil(t, err)
+
+	metrics, err = parser.Parse(bytes)
+	require.NotNil(t, err)
+}
+
+func TestParseLine(t *testing.T) {
+	buf, err := writeValueList(singleMetric.vl)
+	require.Nil(t, err)
+	bytes, err := buf.Bytes()
+	require.Nil(t, err)
+
+	parser, err := NewCollectdParser("", "", []string{})
+	require.Nil(t, err)
+	metric, err := parser.ParseLine(string(bytes))
+	require.Nil(t, err)
+
+	assertEqualMetrics(t, singleMetric.expected, []telegraf.Metric{metric})
+}
+
+func writeValueList(valueLists []api.ValueList) (*network.Buffer, error) {
+	buffer := network.NewBuffer(0)
+
+	ctx := context.Background()
+	for _, vl := range valueLists {
+		err := buffer.Write(ctx, &vl)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return buffer, nil
+}
+
+func assertEqualMetrics(t *testing.T, expected []metricData, received []telegraf.Metric) {
+	require.Equal(t, len(expected), len(received))
+	for i, m := range received {
+		require.Equal(t, expected[i].name, m.Name())
+		require.Equal(t, expected[i].tags, m.Tags())
+		require.Equal(t, expected[i].fields, m.Fields())
+	}
+}
diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go
index 360d795b..bda6aeba 100644
--- a/plugins/parsers/registry.go
+++ b/plugins/parsers/registry.go
@@ -5,6 +5,7 @@ import (
 
 	"github.com/influxdata/telegraf"
 
+	"github.com/influxdata/telegraf/plugins/parsers/collectd"
 	"github.com/influxdata/telegraf/plugins/parsers/graphite"
 	"github.com/influxdata/telegraf/plugins/parsers/influx"
 	"github.com/influxdata/telegraf/plugins/parsers/json"
@@ -53,6 +54,13 @@ type Config struct {
 	// MetricName applies to JSON & value. This will be the name of the measurement.
 	MetricName string
 
+	// Authentication file for collectd
+	CollectdAuthFile string
+	// One of none (default), sign, or encrypt
+	CollectdSecurityLevel string
+	// Dataset specification for collectd
+	CollectdTypesDB []string
+
 	// DataType only applies to value, this will be the type to parse value to
 	DataType string
 
@@ -78,6 +86,9 @@ func NewParser(config *Config) (Parser, error) {
 	case "graphite":
 		parser, err = NewGraphiteParser(config.Separator,
 			config.Templates, config.DefaultTags)
+	case "collectd":
+		parser, err = NewCollectdParser(config.CollectdAuthFile,
+			config.CollectdSecurityLevel, config.CollectdTypesDB)
 	default:
 		err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
 	}
@@ -124,3 +135,11 @@ func NewValueParser(
 		DefaultTags: defaultTags,
 	}, nil
 }
+
+func NewCollectdParser(
+	authFile string,
+	securityLevel string,
+	typesDB []string,
+) (Parser, error) {
+	return collectd.NewCollectdParser(authFile, securityLevel, typesDB)
+}
-- 
GitLab