From fd22b1ef1ffe82ec5d13c6e4fdd056976ec81dba Mon Sep 17 00:00:00 2001
From: Arkady Emelyanov <arkady-emelyanov@users.noreply.github.com>
Date: Wed, 23 May 2018 00:10:41 +0300
Subject: [PATCH] Add burrow input plugin (#3489)

---
 README.md                                     |   1 +
 plugins/inputs/all/all.go                     |   1 +
 plugins/inputs/burrow/README.md               |  98 ++++
 plugins/inputs/burrow/burrow.go               | 485 ++++++++++++++++++
 plugins/inputs/burrow/burrow_test.go          | 285 ++++++++++
 plugins/inputs/burrow/testdata/error.json     |  11 +
 plugins/inputs/burrow/testdata/v3_kafka.json  |  11 +
 .../v3_kafka_clustername1_consumer.json       |  11 +
 ...afka_clustername1_consumer_group1_lag.json |  90 ++++
 .../testdata/v3_kafka_clustername1_topic.json |  11 +
 .../v3_kafka_clustername1_topic_topicA.json   |  13 +
 11 files changed, 1017 insertions(+)
 create mode 100644 plugins/inputs/burrow/README.md
 create mode 100644 plugins/inputs/burrow/burrow.go
 create mode 100644 plugins/inputs/burrow/burrow_test.go
 create mode 100644 plugins/inputs/burrow/testdata/error.json
 create mode 100644 plugins/inputs/burrow/testdata/v3_kafka.json
 create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json
 create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json
 create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json
 create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json

diff --git a/README.md b/README.md
index d4d19db0..8ae242b5 100644
--- a/README.md
+++ b/README.md
@@ -132,6 +132,7 @@ configuration options.
 * [bcache](./plugins/inputs/bcache)
 * [bond](./plugins/inputs/bond)
 * [cassandra](./plugins/inputs/cassandra) (deprecated, use [jolokia2](./plugins/inputs/jolokia2))
+* [burrow](./plugins/inputs/burrow)
 * [ceph](./plugins/inputs/ceph)
 * [cgroup](./plugins/inputs/cgroup)
 * [chrony](./plugins/inputs/chrony)
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index 80db99bf..239cf6e1 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -7,6 +7,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
 	_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
 	_ "github.com/influxdata/telegraf/plugins/inputs/bond"
+	_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
 	_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
 	_ "github.com/influxdata/telegraf/plugins/inputs/ceph"
 	_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
diff --git a/plugins/inputs/burrow/README.md b/plugins/inputs/burrow/README.md
new file mode 100644
index 00000000..039cff8c
--- /dev/null
+++ b/plugins/inputs/burrow/README.md
@@ -0,0 +1,98 @@
+# Telegraf Plugin: Burrow
+
+Collect Kafka topic, consumer and partition status
+via [Burrow](https://github.com/linkedin/Burrow) HTTP [API](https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint).
+
+Supported Burrow version: `1.x`
+
+### Configuration
+
+```
+  ## Burrow API endpoints in format "schema://host:port".
+  ## Default is "http://localhost:8000".
+  servers = ["http://localhost:8000"]
+
+  ## Override Burrow API prefix.
+  ## Useful when Burrow is behind reverse-proxy.
+  # api_prefix = "/v3/kafka"
+
+  ## Maximum time to receive response.
+  # response_timeout = "5s"
+
+  ## Limit per-server concurrent connections.
+  ## Useful in case of large number of topics or consumer groups.
+  # concurrent_connections = 20
+
+  ## Filter clusters, default is no filtering.
+  ## Values can be specified as glob patterns.
+  # clusters_include = []
+  # clusters_exclude = []
+
+  ## Filter consumer groups, default is no filtering.
+  ## Values can be specified as glob patterns.
+  # groups_include = []
+  # groups_exclude = []
+
+  ## Filter topics, default is no filtering.
+  ## Values can be specified as glob patterns.
+  # topics_include = []
+  # topics_exclude = []
+
+  ## Credentials for basic HTTP authentication.
+  # username = ""
+  # password = ""
+
+  ## Optional SSL config
+  # ssl_ca = "/etc/telegraf/ca.pem"
+  # ssl_cert = "/etc/telegraf/cert.pem"
+  # ssl_key = "/etc/telegraf/key.pem"
+  # insecure_skip_verify = false
+```
+
+### Partition Status mappings
+
+* `OK` = 1
+* `NOT_FOUND` = 2
+* `WARN` = 3
+* `ERR` = 4
+* `STOP` = 5
+* `STALL` = 6
+
+> unknown value will be mapped to 0
+
+### Fields
+
+* `burrow_group` (one event per each consumer group)
+  - status (string, see Partition Status mappings)
+  - status_code (int, `1..6`, see Partition status mappings)
+  - parition_count (int, `number of partitions`)
+  - total_lag (int64, `totallag`)
+  - lag (int64, `maxlag.current_lag || 0`)
+
+* `burrow_partition` (one event per each topic partition)
+  - status (string, see Partition Status mappings)
+  - status_code (int, `1..6`, see Partition status mappings)
+  - lag (int64, `current_lag || 0`)
+  - offset (int64, `end.timestamp`)
+  - timestamp (int64, `end.timestamp`)
+
+* `burrow_topic` (one event per topic offset)
+  - offset (int64)
+
+
+### Tags
+
+* `burrow_group`
+  - cluster (string)
+  - group (string)
+
+* `burrow_partition`
+  - cluster (string)
+  - group (string)
+  - topic (string)
+  - partition (int)
+
+* `burrow_topic`
+  - cluster (string)
+  - topic (string)
+  - partition (int)
diff --git a/plugins/inputs/burrow/burrow.go b/plugins/inputs/burrow/burrow.go
new file mode 100644
index 00000000..88fdb4b7
--- /dev/null
+++ b/plugins/inputs/burrow/burrow.go
@@ -0,0 +1,485 @@
+package burrow
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/url"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/filter"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/internal/tls"
+	"github.com/influxdata/telegraf/plugins/inputs"
+)
+
+const (
+	defaultBurrowPrefix          = "/v3/kafka"
+	defaultConcurrentConnections = 20
+	defaultResponseTimeout       = time.Second * 5
+	defaultServer                = "http://localhost:8000"
+)
+
+const configSample = `
+  ## Burrow API endpoints in format "schema://host:port".
+  ## Default is "http://localhost:8000".
+  servers = ["http://localhost:8000"]
+
+  ## Override Burrow API prefix.
+  ## Useful when Burrow is behind reverse-proxy.
+  # api_prefix = "/v3/kafka"
+
+  ## Maximum time to receive response.
+  # response_timeout = "5s"
+
+  ## Limit per-server concurrent connections.
+  ## Useful in case of large number of topics or consumer groups.
+  # concurrent_connections = 20
+
+  ## Filter clusters, default is no filtering.
+  ## Values can be specified as glob patterns.
+  # clusters_include = []
+  # clusters_exclude = []
+
+  ## Filter consumer groups, default is no filtering.
+  ## Values can be specified as glob patterns.
+  # groups_include = []
+  # groups_exclude = []
+
+  ## Filter topics, default is no filtering.
+  ## Values can be specified as glob patterns.
+  # topics_include = []
+  # topics_exclude = []
+
+  ## Credentials for basic HTTP authentication.
+  # username = ""
+  # password = ""
+
+  ## Optional SSL config
+  # ssl_ca = "/etc/telegraf/ca.pem"
+  # ssl_cert = "/etc/telegraf/cert.pem"
+  # ssl_key = "/etc/telegraf/key.pem"
+  # insecure_skip_verify = false
+`
+
+type (
+	burrow struct {
+		tls.ClientConfig
+
+		Servers               []string
+		Username              string
+		Password              string
+		ResponseTimeout       internal.Duration
+		ConcurrentConnections int
+
+		APIPrefix       string `toml:"api_prefix"`
+		ClustersExclude []string
+		ClustersInclude []string
+		GroupsExclude   []string
+		GroupsInclude   []string
+		TopicsExclude   []string
+		TopicsInclude   []string
+
+		client         *http.Client
+		filterClusters filter.Filter
+		filterGroups   filter.Filter
+		filterTopics   filter.Filter
+	}
+
+	// response
+	apiResponse struct {
+		Clusters []string          `json:"clusters"`
+		Groups   []string          `json:"consumers"`
+		Topics   []string          `json:"topics"`
+		Offsets  []int64           `json:"offsets"`
+		Status   apiStatusResponse `json:"status"`
+	}
+
+	// response: status field
+	apiStatusResponse struct {
+		Partitions     []apiStatusResponseLag `json:"partitions"`
+		Status         string                 `json:"status"`
+		PartitionCount int                    `json:"partition_count"`
+		Maxlag         *apiStatusResponseLag  `json:"maxlag"`
+		TotalLag       int64                  `json:"totallag"`
+	}
+
+	// response: lag field
+	apiStatusResponseLag struct {
+		Topic      string                   `json:"topic"`
+		Partition  int32                    `json:"partition"`
+		Status     string                   `json:"status"`
+		Start      apiStatusResponseLagItem `json:"start"`
+		End        apiStatusResponseLagItem `json:"end"`
+		CurrentLag int64                    `json:"current_lag"`
+	}
+
+	// response: lag field item
+	apiStatusResponseLagItem struct {
+		Offset    int64 `json:"offset"`
+		Timestamp int64 `json:"timestamp"`
+		Lag       int64 `json:"lag"`
+	}
+)
+
+func init() {
+	inputs.Add("burrow", func() telegraf.Input {
+		return &burrow{}
+	})
+}
+
+func (b *burrow) SampleConfig() string {
+	return configSample
+}
+
+func (b *burrow) Description() string {
+	return "Collect Kafka topics and consumers status from Burrow HTTP API."
+}
+
+func (b *burrow) Gather(acc telegraf.Accumulator) error {
+	var wg sync.WaitGroup
+
+	if len(b.Servers) == 0 {
+		b.Servers = []string{defaultServer}
+	}
+
+	if b.client == nil {
+		b.setDefaults()
+		if err := b.compileGlobs(); err != nil {
+			return err
+		}
+		c, err := b.createClient()
+		if err != nil {
+			return err
+		}
+		b.client = c
+	}
+
+	for _, addr := range b.Servers {
+		u, err := url.Parse(addr)
+		if err != nil {
+			acc.AddError(fmt.Errorf("unable to parse address '%s': %s", addr, err))
+			continue
+		}
+		if u.Path == "" {
+			u.Path = b.APIPrefix
+		}
+
+		wg.Add(1)
+		go func(u *url.URL) {
+			defer wg.Done()
+			acc.AddError(b.gatherServer(u, acc))
+		}(u)
+	}
+
+	wg.Wait()
+	return nil
+}
+
+func (b *burrow) setDefaults() {
+	if b.APIPrefix == "" {
+		b.APIPrefix = defaultBurrowPrefix
+	}
+	if b.ConcurrentConnections < 1 {
+		b.ConcurrentConnections = defaultConcurrentConnections
+	}
+	if b.ResponseTimeout.Duration < time.Second {
+		b.ResponseTimeout = internal.Duration{
+			Duration: defaultResponseTimeout,
+		}
+	}
+}
+
+func (b *burrow) compileGlobs() error {
+	var err error
+
+	// compile glob patterns
+	b.filterClusters, err = filter.NewIncludeExcludeFilter(b.ClustersInclude, b.ClustersExclude)
+	if err != nil {
+		return err
+	}
+	b.filterGroups, err = filter.NewIncludeExcludeFilter(b.GroupsInclude, b.GroupsExclude)
+	if err != nil {
+		return err
+	}
+	b.filterTopics, err = filter.NewIncludeExcludeFilter(b.TopicsInclude, b.TopicsExclude)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (b *burrow) createClient() (*http.Client, error) {
+	tlsCfg, err := b.ClientConfig.TLSConfig()
+	if err != nil {
+		return nil, err
+	}
+
+	client := &http.Client{
+		Transport: &http.Transport{
+			TLSClientConfig: tlsCfg,
+		},
+		Timeout: b.ResponseTimeout.Duration,
+	}
+
+	return client, nil
+}
+
+func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) {
+	req, err := http.NewRequest(http.MethodGet, u.String(), nil)
+	if err != nil {
+		return nil, err
+	}
+	if b.Username != "" {
+		req.SetBasicAuth(b.Username, b.Password)
+	}
+
+	res, err := b.client.Do(req)
+	if err != nil {
+		return nil, err
+	}
+
+	defer res.Body.Close()
+	if res.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("wrong response: %d", res.StatusCode)
+	}
+
+	ares := &apiResponse{}
+	dec := json.NewDecoder(res.Body)
+
+	return ares, dec.Decode(ares)
+}
+
+func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {
+	var wg sync.WaitGroup
+
+	r, err := b.getResponse(src)
+	if err != nil {
+		return err
+	}
+
+	guard := make(chan struct{}, b.ConcurrentConnections)
+	for _, cluster := range r.Clusters {
+		if !b.filterClusters.Match(cluster) {
+			continue
+		}
+
+		wg.Add(1)
+		go func(cluster string) {
+			defer wg.Done()
+
+			// fetch topic list
+			// endpoint: <api_prefix>/(cluster)/topic
+			ut := appendPathToURL(src, cluster, "topic")
+			b.gatherTopics(guard, ut, cluster, acc)
+		}(cluster)
+
+		wg.Add(1)
+		go func(cluster string) {
+			defer wg.Done()
+
+			// fetch consumer group list
+			// endpoint: <api_prefix>/(cluster)/consumer
+			uc := appendPathToURL(src, cluster, "consumer")
+			b.gatherGroups(guard, uc, cluster, acc)
+		}(cluster)
+	}
+
+	wg.Wait()
+	return nil
+}
+
+func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
+	var wg sync.WaitGroup
+
+	r, err := b.getResponse(src)
+	if err != nil {
+		acc.AddError(err)
+		return
+	}
+
+	for _, topic := range r.Topics {
+		if !b.filterTopics.Match(topic) {
+			continue
+		}
+
+		guard <- struct{}{}
+		wg.Add(1)
+
+		go func(topic string) {
+			defer func() {
+				<-guard
+				wg.Done()
+			}()
+
+			// fetch topic offsets
+			// endpoint: <api_prefix>/<cluster>/topic/<topic>
+			tu := appendPathToURL(src, topic)
+			tr, err := b.getResponse(tu)
+			if err != nil {
+				acc.AddError(err)
+				return
+			}
+
+			b.genTopicMetrics(tr, cluster, topic, acc)
+		}(topic)
+	}
+
+	wg.Wait()
+}
+
+func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) {
+	for i, offset := range r.Offsets {
+		tags := map[string]string{
+			"cluster":   cluster,
+			"topic":     topic,
+			"partition": strconv.Itoa(i),
+		}
+
+		acc.AddFields(
+			"burrow_topic",
+			map[string]interface{}{
+				"offset": offset,
+			},
+			tags,
+		)
+	}
+}
+
+func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
+	var wg sync.WaitGroup
+
+	r, err := b.getResponse(src)
+	if err != nil {
+		acc.AddError(err)
+		return
+	}
+
+	for _, group := range r.Groups {
+		if !b.filterGroups.Match(group) {
+			continue
+		}
+
+		guard <- struct{}{}
+		wg.Add(1)
+
+		go func(group string) {
+			defer func() {
+				<-guard
+				wg.Done()
+			}()
+
+			// fetch consumer group status
+			// endpoint: <api_prefix>/<cluster>/consumer/<group>/lag
+			gl := appendPathToURL(src, group, "lag")
+			gr, err := b.getResponse(gl)
+			if err != nil {
+				acc.AddError(err)
+				return
+			}
+
+			b.genGroupStatusMetrics(gr, cluster, group, acc)
+			b.genGroupLagMetrics(gr, cluster, group, acc)
+		}(group)
+	}
+
+	wg.Wait()
+}
+
+func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
+	partitionCount := r.Status.PartitionCount
+	if partitionCount == 0 {
+		partitionCount = len(r.Status.Partitions)
+	}
+
+	// get max timestamp and offset from partitions list
+	offset := int64(0)
+	timestamp := int64(0)
+	for _, partition := range r.Status.Partitions {
+		if partition.End.Offset > offset {
+			offset = partition.End.Offset
+		}
+		if partition.End.Timestamp > timestamp {
+			timestamp = partition.End.Timestamp
+		}
+	}
+
+	lag := int64(0)
+	if r.Status.Maxlag != nil {
+		lag = r.Status.Maxlag.CurrentLag
+	}
+
+	acc.AddFields(
+		"burrow_group",
+		map[string]interface{}{
+			"status":          r.Status.Status,
+			"status_code":     mapStatusToCode(r.Status.Status),
+			"partition_count": partitionCount,
+			"total_lag":       r.Status.TotalLag,
+			"lag":             lag,
+			"offset":          offset,
+			"timestamp":       timestamp,
+		},
+		map[string]string{
+			"cluster": cluster,
+			"group":   group,
+		},
+	)
+}
+
+func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
+	for _, partition := range r.Status.Partitions {
+		acc.AddFields(
+			"burrow_partition",
+			map[string]interface{}{
+				"status":      partition.Status,
+				"status_code": mapStatusToCode(partition.Status),
+				"lag":         partition.CurrentLag,
+				"offset":      partition.End.Offset,
+				"timestamp":   partition.End.Timestamp,
+			},
+			map[string]string{
+				"cluster":   cluster,
+				"group":     group,
+				"topic":     partition.Topic,
+				"partition": strconv.FormatInt(int64(partition.Partition), 10),
+			},
+		)
+	}
+}
+
+func appendPathToURL(src *url.URL, parts ...string) *url.URL {
+	dst := new(url.URL)
+	*dst = *src
+
+	for i, part := range parts {
+		parts[i] = url.PathEscape(part)
+	}
+
+	ext := strings.Join(parts, "/")
+	dst.Path = fmt.Sprintf("%s/%s", src.Path, ext)
+	return dst
+}
+
+func mapStatusToCode(src string) int {
+	switch src {
+	case "OK":
+		return 1
+	case "NOT_FOUND":
+		return 2
+	case "WARN":
+		return 3
+	case "ERR":
+		return 4
+	case "STOP":
+		return 5
+	case "STALL":
+		return 6
+	default:
+		return 0
+	}
+}
diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go
new file mode 100644
index 00000000..9b3f4a0a
--- /dev/null
+++ b/plugins/inputs/burrow/burrow_test.go
@@ -0,0 +1,285 @@
+package burrow
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"strings"
+	"testing"
+
+	"github.com/influxdata/telegraf/testutil"
+	"github.com/stretchr/testify/require"
+)
+
+// remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json
+func getResponseJSON(requestURI string) ([]byte, int) {
+	uri := strings.TrimLeft(requestURI, "/")
+	mappedFile := strings.Replace(uri, "/", "_", -1)
+	jsonFile := fmt.Sprintf("./testdata/%s.json", mappedFile)
+
+	code := 200
+	_, err := os.Stat(jsonFile)
+	if err != nil {
+		code = 404
+		jsonFile = "./testdata/error.json"
+	}
+
+	// respond with file
+	b, _ := ioutil.ReadFile(jsonFile)
+	return b, code
+}
+
+// return mocked HTTP server
+func getHTTPServer() *httptest.Server {
+	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		body, code := getResponseJSON(r.RequestURI)
+		w.WriteHeader(code)
+		w.Header().Set("Content-Type", "application/json")
+		w.Write(body)
+	}))
+}
+
+// return mocked HTTP server with basic auth
+func getHTTPServerBasicAuth() *httptest.Server {
+	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
+
+		username, password, authOK := r.BasicAuth()
+		if authOK == false {
+			http.Error(w, "Not authorized", 401)
+			return
+		}
+
+		if username != "test" && password != "test" {
+			http.Error(w, "Not authorized", 401)
+			return
+		}
+
+		// ok, continue
+		body, code := getResponseJSON(r.RequestURI)
+		w.WriteHeader(code)
+		w.Header().Set("Content-Type", "application/json")
+		w.Write(body)
+	}))
+}
+
+// test burrow_topic measurement
+func TestBurrowTopic(t *testing.T) {
+	s := getHTTPServer()
+	defer s.Close()
+
+	plugin := &burrow{Servers: []string{s.URL}}
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	fields := []map[string]interface{}{
+		// topicA
+		{"offset": int64(459178195)},
+		{"offset": int64(459178022)},
+		{"offset": int64(456491598)},
+	}
+	tags := []map[string]string{
+		// topicA
+		{"cluster": "clustername1", "topic": "topicA", "partition": "0"},
+		{"cluster": "clustername1", "topic": "topicA", "partition": "1"},
+		{"cluster": "clustername1", "topic": "topicA", "partition": "2"},
+	}
+
+	require.Empty(t, acc.Errors)
+	require.Equal(t, true, acc.HasMeasurement("burrow_topic"))
+	for i := 0; i < len(fields); i++ {
+		acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i])
+	}
+}
+
+// test burrow_partition measurement
+func TestBurrowPartition(t *testing.T) {
+	s := getHTTPServer()
+	defer s.Close()
+
+	plugin := &burrow{
+		Servers: []string{s.URL},
+	}
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	fields := []map[string]interface{}{
+		{
+			"status":      "OK",
+			"status_code": 1,
+			"lag":         int64(0),
+			"offset":      int64(431323195),
+			"timestamp":   int64(1515609490008),
+		},
+		{
+			"status":      "OK",
+			"status_code": 1,
+			"lag":         int64(0),
+			"offset":      int64(431322962),
+			"timestamp":   int64(1515609490008),
+		},
+		{
+			"status":      "OK",
+			"status_code": 1,
+			"lag":         int64(0),
+			"offset":      int64(428636563),
+			"timestamp":   int64(1515609490008),
+		},
+	}
+	tags := []map[string]string{
+		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0"},
+		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "1"},
+		{"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "2"},
+	}
+
+	require.Empty(t, acc.Errors)
+	require.Equal(t, true, acc.HasMeasurement("burrow_partition"))
+
+	for i := 0; i < len(fields); i++ {
+		acc.AssertContainsTaggedFields(t, "burrow_partition", fields[i], tags[i])
+	}
+}
+
+// burrow_group
+func TestBurrowGroup(t *testing.T) {
+	s := getHTTPServer()
+	defer s.Close()
+
+	plugin := &burrow{
+		Servers: []string{s.URL},
+	}
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	fields := []map[string]interface{}{
+		{
+			"status":          "OK",
+			"status_code":     1,
+			"partition_count": 3,
+			"total_lag":       int64(0),
+			"lag":             int64(0),
+			"offset":          int64(431323195),
+			"timestamp":       int64(1515609490008),
+		},
+	}
+
+	tags := []map[string]string{
+		{"cluster": "clustername1", "group": "group1"},
+	}
+
+	require.Empty(t, acc.Errors)
+	require.Equal(t, true, acc.HasMeasurement("burrow_group"))
+
+	for i := 0; i < len(fields); i++ {
+		acc.AssertContainsTaggedFields(t, "burrow_group", fields[i], tags[i])
+	}
+}
+
+// collect from multiple servers
+func TestMultipleServers(t *testing.T) {
+	s1 := getHTTPServer()
+	defer s1.Close()
+
+	s2 := getHTTPServer()
+	defer s2.Close()
+
+	plugin := &burrow{
+		Servers: []string{s1.URL, s2.URL},
+	}
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	require.Exactly(t, 14, len(acc.Metrics))
+	require.Empty(t, acc.Errors)
+}
+
+// collect multiple times
+func TestMultipleRuns(t *testing.T) {
+	s := getHTTPServer()
+	defer s.Close()
+
+	plugin := &burrow{
+		Servers: []string{s.URL},
+	}
+	for i := 0; i < 4; i++ {
+		acc := &testutil.Accumulator{}
+		plugin.Gather(acc)
+
+		require.Exactly(t, 7, len(acc.Metrics))
+		require.Empty(t, acc.Errors)
+	}
+}
+
+// collect from http basic auth server
+func TestBasicAuthConfig(t *testing.T) {
+	s := getHTTPServerBasicAuth()
+	defer s.Close()
+
+	plugin := &burrow{
+		Servers:  []string{s.URL},
+		Username: "test",
+		Password: "test",
+	}
+
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	require.Exactly(t, 7, len(acc.Metrics))
+	require.Empty(t, acc.Errors)
+}
+
+// collect from whitelisted clusters
+func TestFilterClusters(t *testing.T) {
+	s := getHTTPServer()
+	defer s.Close()
+
+	plugin := &burrow{
+		Servers:         []string{s.URL},
+		ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match
+	}
+
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	// no match by cluster
+	require.Exactly(t, 0, len(acc.Metrics))
+	require.Empty(t, acc.Errors)
+}
+
+// collect from whitelisted groups
+func TestFilterGroups(t *testing.T) {
+	s := getHTTPServer()
+	defer s.Close()
+
+	plugin := &burrow{
+		Servers:       []string{s.URL},
+		GroupsInclude: []string{"group?"}, // group1 -> match
+		TopicsExclude: []string{"*"},      // exclude all
+	}
+
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	require.Exactly(t, 4, len(acc.Metrics))
+	require.Empty(t, acc.Errors)
+}
+
+// collect from whitelisted topics
+func TestFilterTopics(t *testing.T) {
+	s := getHTTPServer()
+	defer s.Close()
+
+	plugin := &burrow{
+		Servers:       []string{s.URL},
+		TopicsInclude: []string{"topic?"}, // topicA -> match
+		GroupsExclude: []string{"*"},      // exclude all
+	}
+
+	acc := &testutil.Accumulator{}
+	plugin.Gather(acc)
+
+	require.Exactly(t, 3, len(acc.Metrics))
+	require.Empty(t, acc.Errors)
+}
diff --git a/plugins/inputs/burrow/testdata/error.json b/plugins/inputs/burrow/testdata/error.json
new file mode 100644
index 00000000..f70b863e
--- /dev/null
+++ b/plugins/inputs/burrow/testdata/error.json
@@ -0,0 +1,11 @@
+{
+  "error": true,
+  "message": "Detailed error message",
+  "request": {
+    "uri": "/invalid/request",
+    "host": "responding.host.example.com",
+    "cluster": "",
+    "group": "",
+    "topic": ""
+  }
+}
diff --git a/plugins/inputs/burrow/testdata/v3_kafka.json b/plugins/inputs/burrow/testdata/v3_kafka.json
new file mode 100644
index 00000000..dfc4d044
--- /dev/null
+++ b/plugins/inputs/burrow/testdata/v3_kafka.json
@@ -0,0 +1,11 @@
+{
+  "error": false,
+  "message": "cluster list returned",
+  "clusters": [
+    "clustername1"
+  ],
+  "request": {
+    "url": "/v3/kafka",
+    "host": "example.com"
+  }
+}
diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json
new file mode 100644
index 00000000..f1622644
--- /dev/null
+++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json
@@ -0,0 +1,11 @@
+{
+  "error": false,
+  "message": "consumer list returned",
+  "consumers": [
+    "group1"
+  ],
+  "request": {
+    "url": "/v3/kafka/clustername1/consumer",
+    "host": "example.com"
+  }
+}
diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json
new file mode 100644
index 00000000..21205a66
--- /dev/null
+++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json
@@ -0,0 +1,90 @@
+{
+  "error": false,
+  "message": "consumer status returned",
+  "status": {
+    "cluster": "clustername1",
+    "group": "group1",
+    "status": "OK",
+    "complete": 1,
+    "partitions": [
+      {
+        "topic": "topicA",
+        "partition": 0,
+        "owner": "kafka",
+        "status": "OK",
+        "start": {
+          "offset": 431323195,
+          "timestamp": 1515609445004,
+          "lag": 0
+        },
+        "end": {
+          "offset": 431323195,
+          "timestamp": 1515609490008,
+          "lag": 0
+        },
+        "current_lag": 0,
+        "complete": 1
+      },
+      {
+        "topic": "topicA",
+        "partition": 1,
+        "owner": "kafka",
+        "status": "OK",
+        "start": {
+          "offset": 431322962,
+          "timestamp": 1515609445004,
+          "lag": 0
+        },
+        "end": {
+          "offset": 431322962,
+          "timestamp": 1515609490008,
+          "lag": 0
+        },
+        "current_lag": 0,
+        "complete": 1
+      },
+      {
+        "topic": "topicA",
+        "partition": 2,
+        "owner": "kafka",
+        "status": "OK",
+        "start": {
+          "offset": 428636563,
+          "timestamp": 1515609445004,
+          "lag": 0
+        },
+        "end": {
+          "offset": 428636563,
+          "timestamp": 1515609490008,
+          "lag": 0
+        },
+        "current_lag": 0,
+        "complete": 1
+      }
+    ],
+    "partition_count": 3,
+    "maxlag": {
+      "topic": "topicA",
+      "partition": 0,
+      "owner": "kafka",
+      "status": "OK",
+      "start": {
+        "offset": 431323195,
+        "timestamp": 1515609445004,
+        "lag": 0
+      },
+      "end": {
+        "offset": 431323195,
+        "timestamp": 1515609490008,
+        "lag": 0
+      },
+      "current_lag": 0,
+      "complete": 1
+    },
+    "totallag": 0
+  },
+  "request": {
+    "url": "/v3/kafka/clustername1/consumer/group1/lag",
+    "host": "example.com"
+  }
+}
diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json
new file mode 100644
index 00000000..9bd21a14
--- /dev/null
+++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json
@@ -0,0 +1,11 @@
+{
+  "error": false,
+  "message": "topic list returned",
+  "topics": [
+    "topicA"
+  ],
+  "request": {
+    "url": "/v3/kafka/clustername1/topic",
+    "host": "example.com"
+  }
+}
diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json
new file mode 100644
index 00000000..38a3cee0
--- /dev/null
+++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json
@@ -0,0 +1,13 @@
+{
+  "error": false,
+  "message": "topic offsets returned",
+  "offsets": [
+    459178195,
+    459178022,
+    456491598
+  ],
+  "request": {
+    "url": "/v3/kafka/clustername1/topic/topicA",
+    "host": "example.com"
+  }
+}
-- 
GitLab