From 1392e73125a3d08869ab16379f0ebb4bc12ca365 Mon Sep 17 00:00:00 2001
From: Matt O'Hara <mhohara@users.noreply.github.com>
Date: Tue, 20 Dec 2016 10:30:03 -0600
Subject: [PATCH] Add clusterstats to elasticsearch plugin (#1979)

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* responses to requested changes

* remove unnecessary recommendation
---
 etc/telegraf.conf                             |  11 +-
 plugins/inputs/elasticsearch/README.md        |  14 +-
 plugins/inputs/elasticsearch/elasticsearch.go | 167 ++++++++--
 .../elasticsearch/elasticsearch_test.go       | 140 +++++++--
 plugins/inputs/elasticsearch/testdata_test.go | 292 +++++++++++++++++-
 plugins/parsers/json/parser.go                |  33 +-
 6 files changed, 582 insertions(+), 75 deletions(-)

diff --git a/etc/telegraf.conf b/etc/telegraf.conf
index a7b90338..a6058434 100644
--- a/etc/telegraf.conf
+++ b/etc/telegraf.conf
@@ -784,13 +784,18 @@
 #   ## Timeout for HTTP requests to the elastic search server(s)
 #   http_timeout = "5s"
 #
-#   ## set local to false when you want to read the indices stats from all nodes
-#   ## within the cluster
+#   ## When local is true (the default), the node will read only its own stats.
+#   ## Set local to false when you want to read the node stats from all nodes
+#   ## of the cluster.
 #   local = true
 #
-#   ## set cluster_health to true when you want to also obtain cluster level stats
+#   ## set cluster_health to true when you want to also obtain cluster health stats
 #   cluster_health = false
 #
+#   ## Set cluster_stats to true when you want to obtain cluster stats from the
+#   ## Master node.
+#   cluster_stats = false
+
 #   ## Optional SSL Config
 #   # ssl_ca = "/etc/telegraf/ca.pem"
 #   # ssl_cert = "/etc/telegraf/cert.pem"
diff --git a/plugins/inputs/elasticsearch/README.md b/plugins/inputs/elasticsearch/README.md
index 2cf6f4d7..9cf9b9b0 100644
--- a/plugins/inputs/elasticsearch/README.md
+++ b/plugins/inputs/elasticsearch/README.md
@@ -2,7 +2,8 @@
 
 The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain
 [node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
-and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) stats.
+and optionally [cluster-health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html)
+or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html) metrics.
 
 ### Configuration:
 
@@ -14,13 +15,18 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference
   ## Timeout for HTTP requests to the elastic search server(s)
   http_timeout = "5s"
 
-  ## set local to false when you want to read the indices stats from all nodes
-  ## within the cluster
+  ## When local is true (the default), the node will read only its own stats.
+  ## Set local to false when you want to read the node stats from all nodes
+  ## of the cluster. 
   local = true
 
-  ## set cluster_health to true when you want to also obtain cluster level stats
+  ## Set cluster_health to true when you want to also obtain cluster health stats
   cluster_health = false
 
+  ## Set cluster_stats to true when you want to obtain cluster stats from the 
+  ## Master node. 
+  cluster_stats = false
+
   ## Optional SSL Config
   # ssl_ca = "/etc/telegraf/ca.pem"
   # ssl_cert = "/etc/telegraf/cert.pem"
diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go
index cce3d94f..5d5d6490 100644
--- a/plugins/inputs/elasticsearch/elasticsearch.go
+++ b/plugins/inputs/elasticsearch/elasticsearch.go
@@ -12,13 +12,15 @@ import (
 	"github.com/influxdata/telegraf/internal/errchan"
 	"github.com/influxdata/telegraf/plugins/inputs"
 	jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
+	"io/ioutil"
+	"strings"
 )
 
+// Nodestats are always generated, so simply define a constant for these endpoints
 const statsPath = "/_nodes/stats"
 const statsPathLocal = "/_nodes/_local/stats"
-const healthPath = "/_cluster/health"
 
-type node struct {
+type nodeStat struct {
 	Host       string            `json:"host"`
 	Name       string            `json:"name"`
 	Attributes map[string]string `json:"attributes"`
@@ -58,6 +60,20 @@ type indexHealth struct {
 	UnassignedShards    int    `json:"unassigned_shards"`
 }
 
+type clusterStats struct {
+	NodeName    string      `json:"node_name"`
+	ClusterName string      `json:"cluster_name"`
+	Status      string      `json:"status"`
+	Indices     interface{} `json:"indices"`
+	Nodes       interface{} `json:"nodes"`
+}
+
+type catMaster struct {
+	NodeID   string `json:"id"`
+	NodeIP   string `json:"ip"`
+	NodeName string `json:"node"`
+}
+
 const sampleConfig = `
   ## specify a list of one or more Elasticsearch servers
   # you can add username and password to your url to use basic authentication:
@@ -67,13 +83,18 @@ const sampleConfig = `
   ## Timeout for HTTP requests to the elastic search server(s)
   http_timeout = "5s"
 
-  ## set local to false when you want to read the indices stats from all nodes
-  ## within the cluster
+  ## When local is true (the default), the node will read only its own stats.
+  ## Set local to false when you want to read the node stats from all nodes
+  ## of the cluster.
   local = true
 
-  ## set cluster_health to true when you want to also obtain cluster level stats
+  ## Set cluster_health to true when you want to also obtain cluster health stats
   cluster_health = false
 
+  ## Set cluster_stats to true when you want to also obtain cluster stats from the
+  ## Master node.
+  cluster_stats = false
+
   ## Optional SSL Config
   # ssl_ca = "/etc/telegraf/ca.pem"
   # ssl_cert = "/etc/telegraf/cert.pem"
@@ -85,15 +106,18 @@ const sampleConfig = `
 // Elasticsearch is a plugin to read stats from one or many Elasticsearch
 // servers.
 type Elasticsearch struct {
-	Local              bool
-	Servers            []string
-	HttpTimeout        internal.Duration
-	ClusterHealth      bool
-	SSLCA              string `toml:"ssl_ca"`   // Path to CA file
-	SSLCert            string `toml:"ssl_cert"` // Path to host cert file
-	SSLKey             string `toml:"ssl_key"`  // Path to cert key file
-	InsecureSkipVerify bool   // Use SSL but skip chain & host verification
-	client             *http.Client
+	Local                   bool
+	Servers                 []string
+	HttpTimeout             internal.Duration
+	ClusterHealth           bool
+	ClusterStats            bool
+	SSLCA                   string `toml:"ssl_ca"`   // Path to CA file
+	SSLCert                 string `toml:"ssl_cert"` // Path to host cert file
+	SSLKey                  string `toml:"ssl_key"`  // Path to cert key file
+	InsecureSkipVerify      bool   // Use SSL but skip chain & host verification
+	client                  *http.Client
+	catMasterResponseTokens []string
+	isMaster                bool
 }
 
 // NewElasticsearch return a new instance of Elasticsearch
@@ -138,12 +162,27 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
 			} else {
 				url = s + statsPath
 			}
+			e.isMaster = false
+
+			if e.ClusterStats {
+				// get cat/master information here so NodeStats can determine
+				// whether this node is the Master
+				e.setCatMaster(s + "/_cat/master")
+			}
+
+			// Always gather node states
 			if err := e.gatherNodeStats(url, acc); err != nil {
 				errChan.C <- err
 				return
 			}
+
 			if e.ClusterHealth {
-				e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc)
+				url = s + "/_cluster/health?level=indices"
+				e.gatherClusterHealth(url, acc)
+			}
+
+			if e.ClusterStats && e.isMaster {
+				e.gatherClusterStats(s+"/_cluster/stats", acc)
 			}
 		}(serv, acc)
 	}
@@ -171,12 +210,13 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
 
 func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
 	nodeStats := &struct {
-		ClusterName string           `json:"cluster_name"`
-		Nodes       map[string]*node `json:"nodes"`
+		ClusterName string               `json:"cluster_name"`
+		Nodes       map[string]*nodeStat `json:"nodes"`
 	}{}
-	if err := e.gatherData(url, nodeStats); err != nil {
+	if err := e.gatherJsonData(url, nodeStats); err != nil {
 		return err
 	}
+
 	for id, n := range nodeStats.Nodes {
 		tags := map[string]string{
 			"node_id":      id,
@@ -185,6 +225,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
 			"cluster_name": nodeStats.ClusterName,
 		}
 
+		if e.ClusterStats {
+			// check for master
+			e.isMaster = (id == e.catMasterResponseTokens[0])
+		}
+
 		for k, v := range n.Attributes {
 			tags["node_attribute_"+k] = v
 		}
@@ -204,6 +249,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
 		now := time.Now()
 		for p, s := range stats {
 			f := jsonparser.JSONFlattener{}
+			// parse Json, ignoring strings and bools
 			err := f.FlattenJSON("", s)
 			if err != nil {
 				return err
@@ -214,31 +260,31 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
 	return nil
 }
 
-func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
-	clusterStats := &clusterHealth{}
-	if err := e.gatherData(url, clusterStats); err != nil {
+func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator) error {
+	healthStats := &clusterHealth{}
+	if err := e.gatherJsonData(url, healthStats); err != nil {
 		return err
 	}
 	measurementTime := time.Now()
 	clusterFields := map[string]interface{}{
-		"status":                clusterStats.Status,
-		"timed_out":             clusterStats.TimedOut,
-		"number_of_nodes":       clusterStats.NumberOfNodes,
-		"number_of_data_nodes":  clusterStats.NumberOfDataNodes,
-		"active_primary_shards": clusterStats.ActivePrimaryShards,
-		"active_shards":         clusterStats.ActiveShards,
-		"relocating_shards":     clusterStats.RelocatingShards,
-		"initializing_shards":   clusterStats.InitializingShards,
-		"unassigned_shards":     clusterStats.UnassignedShards,
+		"status":                healthStats.Status,
+		"timed_out":             healthStats.TimedOut,
+		"number_of_nodes":       healthStats.NumberOfNodes,
+		"number_of_data_nodes":  healthStats.NumberOfDataNodes,
+		"active_primary_shards": healthStats.ActivePrimaryShards,
+		"active_shards":         healthStats.ActiveShards,
+		"relocating_shards":     healthStats.RelocatingShards,
+		"initializing_shards":   healthStats.InitializingShards,
+		"unassigned_shards":     healthStats.UnassignedShards,
 	}
 	acc.AddFields(
 		"elasticsearch_cluster_health",
 		clusterFields,
-		map[string]string{"name": clusterStats.ClusterName},
+		map[string]string{"name": healthStats.ClusterName},
 		measurementTime,
 	)
 
-	for name, health := range clusterStats.Indices {
+	for name, health := range healthStats.Indices {
 		indexFields := map[string]interface{}{
 			"status":                health.Status,
 			"number_of_shards":      health.NumberOfShards,
@@ -259,7 +305,60 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
 	return nil
 }
 
-func (e *Elasticsearch) gatherData(url string, v interface{}) error {
+func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
+	clusterStats := &clusterStats{}
+	if err := e.gatherJsonData(url, clusterStats); err != nil {
+		return err
+	}
+	now := time.Now()
+	tags := map[string]string{
+		"node_name":    clusterStats.NodeName,
+		"cluster_name": clusterStats.ClusterName,
+		"status":       clusterStats.Status,
+	}
+
+	stats := map[string]interface{}{
+		"nodes":   clusterStats.Nodes,
+		"indices": clusterStats.Indices,
+	}
+
+	for p, s := range stats {
+		f := jsonparser.JSONFlattener{}
+		// parse json, including bools and strings
+		err := f.FullFlattenJSON("", s, true, true)
+		if err != nil {
+			return err
+		}
+		acc.AddFields("elasticsearch_clusterstats_"+p, f.Fields, tags, now)
+	}
+
+	return nil
+}
+
+func (e *Elasticsearch) setCatMaster(url string) error {
+	r, err := e.client.Get(url)
+	if err != nil {
+		return err
+	}
+	defer r.Body.Close()
+	if r.StatusCode != http.StatusOK {
+		// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
+		// to let the underlying transport close the connection and re-establish a new one for
+		// future calls.
+		return fmt.Errorf("status-code %d, expected %d", r.StatusCode, http.StatusOK)
+	}
+	response, err := ioutil.ReadAll(r.Body)
+
+	if err != nil {
+		return err
+	}
+
+	e.catMasterResponseTokens = strings.Split(string(response), " ")
+
+	return nil
+}
+
+func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
 	r, err := e.client.Get(url)
 	if err != nil {
 		return err
@@ -272,9 +371,11 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
 		return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
 			r.StatusCode, http.StatusOK)
 	}
+
 	if err = json.NewDecoder(r.Body).Decode(v); err != nil {
 		return err
 	}
+
 	return nil
 }
 
diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go
index 760ac921..59caa430 100644
--- a/plugins/inputs/elasticsearch/elasticsearch_test.go
+++ b/plugins/inputs/elasticsearch/elasticsearch_test.go
@@ -8,6 +8,8 @@ import (
 
 	"github.com/influxdata/telegraf/testutil"
 
+	"fmt"
+	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
 
@@ -37,43 +39,70 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
 func (t *transportMock) CancelRequest(_ *http.Request) {
 }
 
-func TestElasticsearch(t *testing.T) {
+func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) {
+	if es.isMaster != expected {
+		msg := fmt.Sprintf("IsMaster set incorrectly")
+		assert.Fail(t, msg)
+	}
+}
+func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
+	tags := map[string]string{
+		"cluster_name":          "es-testcluster",
+		"node_attribute_master": "true",
+		"node_id":               "SDFsfSDFsdfFSDSDfSFDSDF",
+		"node_name":             "test.host.com",
+		"node_host":             "test",
+	}
+
+	acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
+}
+
+func TestGather(t *testing.T) {
 	es := newElasticsearchWithClient()
 	es.Servers = []string{"http://example.com:9200"}
-	es.client.Transport = newTransportMock(http.StatusOK, statsResponse)
+	es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
 
 	var acc testutil.Accumulator
 	if err := es.Gather(&acc); err != nil {
 		t.Fatal(err)
 	}
 
-	tags := map[string]string{
-		"cluster_name":          "es-testcluster",
-		"node_attribute_master": "true",
-		"node_id":               "SDFsfSDFsdfFSDSDfSFDSDF",
-		"node_name":             "test.host.com",
-		"node_host":             "test",
+	checkIsMaster(es, false, t)
+	checkNodeStatsResult(t, &acc)
+}
+
+func TestGatherNodeStats(t *testing.T) {
+	es := newElasticsearchWithClient()
+	es.Servers = []string{"http://example.com:9200"}
+	es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
+
+	var acc testutil.Accumulator
+	if err := es.gatherNodeStats("junk", &acc); err != nil {
+		t.Fatal(err)
 	}
 
-	acc.AssertContainsTaggedFields(t, "elasticsearch_indices", indicesExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_os", osExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_process", processExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", jvmExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", threadPoolExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_fs", fsExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_transport", transportExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_http", httpExpected, tags)
-	acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", breakersExpected, tags)
+	checkIsMaster(es, false, t)
+	checkNodeStatsResult(t, &acc)
 }
 
-func TestGatherClusterStats(t *testing.T) {
+func TestGatherClusterHealth(t *testing.T) {
 	es := newElasticsearchWithClient()
 	es.Servers = []string{"http://example.com:9200"}
 	es.ClusterHealth = true
-	es.client.Transport = newTransportMock(http.StatusOK, clusterResponse)
+	es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
 
 	var acc testutil.Accumulator
-	require.NoError(t, es.Gather(&acc))
+	require.NoError(t, es.gatherClusterHealth("junk", &acc))
+
+	checkIsMaster(es, false, t)
 
 	acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
 		clusterHealthExpected,
@@ -88,6 +117,77 @@ func TestGatherClusterStats(t *testing.T) {
 		map[string]string{"index": "v2"})
 }
 
+func TestGatherClusterStatsMaster(t *testing.T) {
+	// This needs multiple steps to replicate the multiple calls internally.
+	es := newElasticsearchWithClient()
+	es.ClusterStats = true
+	es.Servers = []string{"http://example.com:9200"}
+
+	// first get catMaster
+	es.client.Transport = newTransportMock(http.StatusOK, IsMasterResult)
+	require.NoError(t, es.setCatMaster("junk"))
+
+	IsMasterResultTokens := strings.Split(string(IsMasterResult), " ")
+	if es.catMasterResponseTokens[0] != IsMasterResultTokens[0] {
+		msg := fmt.Sprintf("catmaster is incorrect")
+		assert.Fail(t, msg)
+	}
+
+	// now get node status, which determines whether we're master
+	var acc testutil.Accumulator
+	es.Local = true
+	es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
+	if err := es.gatherNodeStats("junk", &acc); err != nil {
+		t.Fatal(err)
+	}
+
+	checkIsMaster(es, true, t)
+	checkNodeStatsResult(t, &acc)
+
+	// now test the clusterstats method
+	es.client.Transport = newTransportMock(http.StatusOK, clusterStatsResponse)
+	require.NoError(t, es.gatherClusterStats("junk", &acc))
+
+	tags := map[string]string{
+		"cluster_name": "es-testcluster",
+		"node_name":    "test.host.com",
+		"status":       "red",
+	}
+
+	acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_nodes", clusterstatsNodesExpected, tags)
+	acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_indices", clusterstatsIndicesExpected, tags)
+}
+
+func TestGatherClusterStatsNonMaster(t *testing.T) {
+	// This needs multiple steps to replicate the multiple calls internally.
+	es := newElasticsearchWithClient()
+	es.ClusterStats = true
+	es.Servers = []string{"http://example.com:9200"}
+
+	// first get catMaster
+	es.client.Transport = newTransportMock(http.StatusOK, IsNotMasterResult)
+	require.NoError(t, es.setCatMaster("junk"))
+
+	IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ")
+	if es.catMasterResponseTokens[0] != IsNotMasterResultTokens[0] {
+		msg := fmt.Sprintf("catmaster is incorrect")
+		assert.Fail(t, msg)
+	}
+
+	// now get node status, which determines whether we're master
+	var acc testutil.Accumulator
+	es.Local = true
+	es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
+	if err := es.gatherNodeStats("junk", &acc); err != nil {
+		t.Fatal(err)
+	}
+
+	// ensure flag is clear so Cluster Stats would not be done
+	checkIsMaster(es, false, t)
+	checkNodeStatsResult(t, &acc)
+
+}
+
 func newElasticsearchWithClient() *Elasticsearch {
 	es := NewElasticsearch()
 	es.client = &http.Client{}
diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go
index bca1f9e4..19ebb3bf 100644
--- a/plugins/inputs/elasticsearch/testdata_test.go
+++ b/plugins/inputs/elasticsearch/testdata_test.go
@@ -1,6 +1,6 @@
 package elasticsearch
 
-const clusterResponse = `
+const clusterHealthResponse = `
 {
    "cluster_name": "elasticsearch_telegraf",
    "status": "green",
@@ -71,7 +71,7 @@ var v2IndexExpected = map[string]interface{}{
 	"unassigned_shards":     20,
 }
 
-const statsResponse = `
+const nodeStatsResponse = `
 {
   "cluster_name": "es-testcluster",
   "nodes": {
@@ -489,7 +489,7 @@ const statsResponse = `
 }
 `
 
-var indicesExpected = map[string]interface{}{
+var nodestatsIndicesExpected = map[string]interface{}{
 	"id_cache_memory_size_in_bytes":             float64(0),
 	"completion_size_in_bytes":                  float64(0),
 	"suggest_total":                             float64(0),
@@ -561,7 +561,7 @@ var indicesExpected = map[string]interface{}{
 	"segments_fixed_bit_set_memory_in_bytes":    float64(0),
 }
 
-var osExpected = map[string]interface{}{
+var nodestatsOsExpected = map[string]interface{}{
 	"load_average_0":           float64(0.01),
 	"load_average_1":           float64(0.04),
 	"load_average_2":           float64(0.05),
@@ -576,7 +576,7 @@ var osExpected = map[string]interface{}{
 	"mem_used_in_bytes":        float64(1621868544),
 }
 
-var processExpected = map[string]interface{}{
+var nodestatsProcessExpected = map[string]interface{}{
 	"mem_total_virtual_in_bytes": float64(4747890688),
 	"timestamp":                  float64(1436460392945),
 	"open_file_descriptors":      float64(160),
@@ -586,7 +586,7 @@ var processExpected = map[string]interface{}{
 	"cpu_user_in_millis":         float64(13610),
 }
 
-var jvmExpected = map[string]interface{}{
+var nodestatsJvmExpected = map[string]interface{}{
 	"timestamp":                                     float64(1436460392945),
 	"uptime_in_millis":                              float64(202245),
 	"mem_non_heap_used_in_bytes":                    float64(39634576),
@@ -621,7 +621,7 @@ var jvmExpected = map[string]interface{}{
 	"buffer_pools_mapped_total_capacity_in_bytes":   float64(0),
 }
 
-var threadPoolExpected = map[string]interface{}{
+var nodestatsThreadPoolExpected = map[string]interface{}{
 	"merge_threads":                 float64(6),
 	"merge_queue":                   float64(4),
 	"merge_active":                  float64(5),
@@ -726,7 +726,7 @@ var threadPoolExpected = map[string]interface{}{
 	"flush_completed":               float64(3),
 }
 
-var fsExpected = map[string]interface{}{
+var nodestatsFsExpected = map[string]interface{}{
 	"data_0_total_in_bytes":     float64(19507089408),
 	"data_0_free_in_bytes":      float64(16909316096),
 	"data_0_available_in_bytes": float64(15894814720),
@@ -736,7 +736,7 @@ var fsExpected = map[string]interface{}{
 	"total_total_in_bytes":      float64(19507089408),
 }
 
-var transportExpected = map[string]interface{}{
+var nodestatsTransportExpected = map[string]interface{}{
 	"server_open":      float64(13),
 	"rx_count":         float64(6),
 	"rx_size_in_bytes": float64(1380),
@@ -744,12 +744,12 @@ var transportExpected = map[string]interface{}{
 	"tx_size_in_bytes": float64(1380),
 }
 
-var httpExpected = map[string]interface{}{
+var nodestatsHttpExpected = map[string]interface{}{
 	"current_open": float64(3),
 	"total_opened": float64(3),
 }
 
-var breakersExpected = map[string]interface{}{
+var nodestatsBreakersExpected = map[string]interface{}{
 	"fielddata_estimated_size_in_bytes": float64(0),
 	"fielddata_overhead":                float64(1.03),
 	"fielddata_tripped":                 float64(0),
@@ -763,3 +763,273 @@ var breakersExpected = map[string]interface{}{
 	"parent_limit_size_in_bytes":        float64(727213670),
 	"parent_estimated_size_in_bytes":    float64(0),
 }
+
+const clusterStatsResponse = `
+{
+   "host":"ip-10-0-1-214",
+   "log_type":"metrics",
+   "timestamp":1475767451229,
+   "log_level":"INFO",
+   "node_name":"test.host.com",
+   "cluster_name":"es-testcluster",
+   "status":"red",
+   "indices":{
+      "count":1,
+      "shards":{
+         "total":4,
+         "primaries":4,
+         "replication":0.0,
+         "index":{
+            "shards":{
+               "min":4,
+               "max":4,
+               "avg":4.0
+            },
+            "primaries":{
+               "min":4,
+               "max":4,
+               "avg":4.0
+            },
+            "replication":{
+               "min":0.0,
+               "max":0.0,
+               "avg":0.0
+            }
+         }
+      },
+      "docs":{
+         "count":4,
+         "deleted":0
+      },
+      "store":{
+         "size_in_bytes":17084,
+         "throttle_time_in_millis":0
+      },
+      "fielddata":{
+         "memory_size_in_bytes":0,
+         "evictions":0
+      },
+      "query_cache":{
+         "memory_size_in_bytes":0,
+         "total_count":0,
+         "hit_count":0,
+         "miss_count":0,
+         "cache_size":0,
+         "cache_count":0,
+         "evictions":0
+      },
+      "completion":{
+         "size_in_bytes":0
+      },
+      "segments":{
+         "count":4,
+         "memory_in_bytes":11828,
+         "terms_memory_in_bytes":8932,
+         "stored_fields_memory_in_bytes":1248,
+         "term_vectors_memory_in_bytes":0,
+         "norms_memory_in_bytes":1280,
+         "doc_values_memory_in_bytes":368,
+         "index_writer_memory_in_bytes":0,
+         "index_writer_max_memory_in_bytes":2048000,
+         "version_map_memory_in_bytes":0,
+         "fixed_bit_set_memory_in_bytes":0
+      },
+      "percolate":{
+         "total":0,
+         "time_in_millis":0,
+         "current":0,
+         "memory_size_in_bytes":-1,
+         "memory_size":"-1b",
+         "queries":0
+      }
+   },
+   "nodes":{
+      "count":{
+         "total":1,
+         "master_only":0,
+         "data_only":0,
+         "master_data":1,
+         "client":0
+      },
+      "versions":[
+         {
+         "version": "2.3.3"
+         }
+      ],
+      "os":{
+         "available_processors":1,
+         "allocated_processors":1,
+         "mem":{
+            "total_in_bytes":593301504
+         },
+         "names":[
+            {
+               "name":"Linux",
+               "count":1
+            }
+         ]
+      },
+      "process":{
+         "cpu":{
+            "percent":0
+         },
+         "open_file_descriptors":{
+            "min":145,
+            "max":145,
+            "avg":145
+         }
+      },
+      "jvm":{
+         "max_uptime_in_millis":11580527,
+         "versions":[
+            {
+               "version":"1.8.0_101",
+               "vm_name":"OpenJDK 64-Bit Server VM",
+               "vm_version":"25.101-b13",
+               "vm_vendor":"Oracle Corporation",
+               "count":1
+            }
+         ],
+         "mem":{
+            "heap_used_in_bytes":70550288,
+            "heap_max_in_bytes":1065025536
+         },
+         "threads":30
+      },
+      "fs":{
+         "total_in_bytes":8318783488,
+         "free_in_bytes":6447439872,
+         "available_in_bytes":6344785920
+      },
+      "plugins":[
+         {
+            "name":"cloud-aws",
+            "version":"2.3.3",
+            "description":"The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.",
+            "jvm":true,
+            "classname":"org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin",
+            "isolated":true,
+            "site":false
+         },
+         {
+            "name":"kopf",
+            "version":"2.0.1",
+            "description":"kopf - simple web administration tool for Elasticsearch",
+            "url":"/_plugin/kopf/",
+            "jvm":false,
+            "site":true
+         },
+         {
+            "name":"tr-metrics",
+            "version":"7bd5b4b",
+            "description":"Logs cluster and node stats for performance monitoring.",
+            "jvm":true,
+            "classname":"com.trgr.elasticsearch.plugin.metrics.MetricsPlugin",
+            "isolated":true,
+            "site":false
+         }
+      ]
+   }
+}
+`
+
+var clusterstatsIndicesExpected = map[string]interface{}{
+	"completion_size_in_bytes":                  float64(0),
+	"count":                                     float64(1),
+	"docs_count":                                float64(4),
+	"docs_deleted":                              float64(0),
+	"fielddata_evictions":                       float64(0),
+	"fielddata_memory_size_in_bytes":            float64(0),
+	"percolate_current":                         float64(0),
+	"percolate_memory_size_in_bytes":            float64(-1),
+	"percolate_queries":                         float64(0),
+	"percolate_time_in_millis":                  float64(0),
+	"percolate_total":                           float64(0),
+	"percolate_memory_size":                     "-1b",
+	"query_cache_cache_count":                   float64(0),
+	"query_cache_cache_size":                    float64(0),
+	"query_cache_evictions":                     float64(0),
+	"query_cache_hit_count":                     float64(0),
+	"query_cache_memory_size_in_bytes":          float64(0),
+	"query_cache_miss_count":                    float64(0),
+	"query_cache_total_count":                   float64(0),
+	"segments_count":                            float64(4),
+	"segments_doc_values_memory_in_bytes":       float64(368),
+	"segments_fixed_bit_set_memory_in_bytes":    float64(0),
+	"segments_index_writer_max_memory_in_bytes": float64(2.048e+06),
+	"segments_index_writer_memory_in_bytes":     float64(0),
+	"segments_memory_in_bytes":                  float64(11828),
+	"segments_norms_memory_in_bytes":            float64(1280),
+	"segments_stored_fields_memory_in_bytes":    float64(1248),
+	"segments_term_vectors_memory_in_bytes":     float64(0),
+	"segments_terms_memory_in_bytes":            float64(8932),
+	"segments_version_map_memory_in_bytes":      float64(0),
+	"shards_index_primaries_avg":                float64(4),
+	"shards_index_primaries_max":                float64(4),
+	"shards_index_primaries_min":                float64(4),
+	"shards_index_replication_avg":              float64(0),
+	"shards_index_replication_max":              float64(0),
+	"shards_index_replication_min":              float64(0),
+	"shards_index_shards_avg":                   float64(4),
+	"shards_index_shards_max":                   float64(4),
+	"shards_index_shards_min":                   float64(4),
+	"shards_primaries":                          float64(4),
+	"shards_replication":                        float64(0),
+	"shards_total":                              float64(4),
+	"store_size_in_bytes":                       float64(17084),
+	"store_throttle_time_in_millis":             float64(0),
+}
+
+var clusterstatsNodesExpected = map[string]interface{}{
+	"count_client":                      float64(0),
+	"count_data_only":                   float64(0),
+	"count_master_data":                 float64(1),
+	"count_master_only":                 float64(0),
+	"count_total":                       float64(1),
+	"fs_available_in_bytes":             float64(6.34478592e+09),
+	"fs_free_in_bytes":                  float64(6.447439872e+09),
+	"fs_total_in_bytes":                 float64(8.318783488e+09),
+	"jvm_max_uptime_in_millis":          float64(1.1580527e+07),
+	"jvm_mem_heap_max_in_bytes":         float64(1.065025536e+09),
+	"jvm_mem_heap_used_in_bytes":        float64(7.0550288e+07),
+	"jvm_threads":                       float64(30),
+	"jvm_versions_0_count":              float64(1),
+	"jvm_versions_0_version":            "1.8.0_101",
+	"jvm_versions_0_vm_name":            "OpenJDK 64-Bit Server VM",
+	"jvm_versions_0_vm_vendor":          "Oracle Corporation",
+	"jvm_versions_0_vm_version":         "25.101-b13",
+	"os_allocated_processors":           float64(1),
+	"os_available_processors":           float64(1),
+	"os_mem_total_in_bytes":             float64(5.93301504e+08),
+	"os_names_0_count":                  float64(1),
+	"os_names_0_name":                   "Linux",
+	"process_cpu_percent":               float64(0),
+	"process_open_file_descriptors_avg": float64(145),
+	"process_open_file_descriptors_max": float64(145),
+	"process_open_file_descriptors_min": float64(145),
+	"versions_0_version":                "2.3.3",
+	"plugins_0_classname":               "org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin",
+	"plugins_0_description":             "The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.",
+	"plugins_0_isolated":                true,
+	"plugins_0_jvm":                     true,
+	"plugins_0_name":                    "cloud-aws",
+	"plugins_0_site":                    false,
+	"plugins_0_version":                 "2.3.3",
+	"plugins_1_description":             "kopf - simple web administration tool for Elasticsearch",
+	"plugins_1_jvm":                     false,
+	"plugins_1_name":                    "kopf",
+	"plugins_1_site":                    true,
+	"plugins_1_url":                     "/_plugin/kopf/",
+	"plugins_1_version":                 "2.0.1",
+	"plugins_2_classname":               "com.trgr.elasticsearch.plugin.metrics.MetricsPlugin",
+	"plugins_2_description":             "Logs cluster and node stats for performance monitoring.",
+	"plugins_2_isolated":                true,
+	"plugins_2_jvm":                     true,
+	"plugins_2_name":                    "tr-metrics",
+	"plugins_2_site":                    false,
+	"plugins_2_version":                 "7bd5b4b",
+}
+
+const IsMasterResult = "SDFsfSDFsdfFSDSDfSFDSDF 10.206.124.66 10.206.124.66 test.host.com "
+
+const IsNotMasterResult = "junk 10.206.124.66 10.206.124.66 test.junk.com "
diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go
index edd5afc5..a8743558 100644
--- a/plugins/parsers/json/parser.go
+++ b/plugins/parsers/json/parser.go
@@ -103,10 +103,22 @@ type JSONFlattener struct {
 	Fields map[string]interface{}
 }
 
-// FlattenJSON flattens nested maps/interfaces into a fields map
+// FlattenJSON flattens nested maps/interfaces into a fields map (ignoring bools and string)
 func (f *JSONFlattener) FlattenJSON(
+	fieldname string,
+	v interface{}) error {
+	if f.Fields == nil {
+		f.Fields = make(map[string]interface{})
+	}
+	return f.FullFlattenJSON(fieldname, v, false, false)
+}
+
+// FullFlattenJSON flattens nested maps/interfaces into a fields map (including bools and string)
+func (f *JSONFlattener) FullFlattenJSON(
 	fieldname string,
 	v interface{},
+	convertString bool,
+	convertBool bool,
 ) error {
 	if f.Fields == nil {
 		f.Fields = make(map[string]interface{})
@@ -115,7 +127,7 @@ func (f *JSONFlattener) FlattenJSON(
 	switch t := v.(type) {
 	case map[string]interface{}:
 		for k, v := range t {
-			err := f.FlattenJSON(fieldname+"_"+k+"_", v)
+			err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool)
 			if err != nil {
 				return err
 			}
@@ -123,15 +135,28 @@ func (f *JSONFlattener) FlattenJSON(
 	case []interface{}:
 		for i, v := range t {
 			k := strconv.Itoa(i)
-			err := f.FlattenJSON(fieldname+"_"+k+"_", v)
+			err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool)
 			if err != nil {
 				return nil
 			}
 		}
 	case float64:
 		f.Fields[fieldname] = t
-	case bool, string, nil:
+	case string:
+		if convertString {
+			f.Fields[fieldname] = v.(string)
+		} else {
+			return nil
+		}
+	case bool:
+		if convertBool {
+			f.Fields[fieldname] = v.(bool)
+		} else {
+			return nil
+		}
+	case nil:
 		// ignored types
+		fmt.Println("json parser ignoring " + fieldname)
 		return nil
 	default:
 		return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
-- 
GitLab