Skip to content
Snippets Groups Projects
Commit b1b4adec authored by subhachandrachandra's avatar subhachandrachandra Committed by Cameron Sparr
Browse files

Added cassandra plugin to access metrics using jolokia and push them to influxdb.

parent 1934cc2e
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ no longer insert field names when the field is simply named `value`. This is
because the `value` field is redundant in the graphite/librato context.
### Features
- [#952](https://github.com/influxdata/telegraf/pull/952): Cassandra input plugin. Thanks @subhachandrachandra!
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
- [#979](https://github.com/influxdata/telegraf/pull/979): Reduce allocations in the TCP listener.
- [#992](https://github.com/influxdata/telegraf/pull/992): Refactor allocations in TCP/UDP listeners.
......
......@@ -160,6 +160,7 @@ Currently implemented sources:
* [aerospike](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/aerospike)
* [apache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/apache)
* [bcache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/bcache)
* [cassandra](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/cassandra)
* [couchbase](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchbase)
* [couchdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchdb)
* [disque](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/disque)
......
......@@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
......
# Telegraf plugin: Cassandra
#### Plugin arguments:
- **context** string: Context root used for jolokia url
- **servers** []string: List of servers with the format "<user:passwd@><host>:port"
- **metrics** []string: List of Jmx paths that identify mbeans attributes
#### Description
The Cassandra plugin collects Cassandra/JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured.
See: https://jolokia.org/ and [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html)
# Measurements:
Cassandra plugin produces one or more measurements for each metric configured, adding Server's name as `host` tag. More than one measurement is generated when querying table metrics with a wildcard for the keyspace or table name.
Given a configuration like:
```toml
[[inputs.cassandra]]
context = "/jolokia/read"
servers = [":8778"]
metrics = ["/java.lang:type=Memory/HeapMemoryUsage"]
```
The collected metrics will be:
```
javaMemory,host=myHost,mname=HeapMemoryUsage HeapMemoryUsage_committed=1040187392,HeapMemoryUsage_init=1050673152,HeapMemoryUsage_max=1040187392,HeapMemoryUsage_used=368155000 1459551767230567084
```
# Useful Metrics:
Here is a list of metrics that might be useful to monitor your cassandra cluster. This was put together from multiple sources on the web.
- [How to monitor Cassandra performance metrics](https://www.datadoghq.com/blog/how-to-monitor-cassandra-performance-metrics)
- [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html)
####measurement = javaGarbageCollector
- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime
- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount
- /java.lang:type=GarbageCollector,name=ParNew/CollectionTime
- /java.lang:type=GarbageCollector,name=ParNew/CollectionCount
####measurement = javaMemory
- /java.lang:type=Memory/HeapMemoryUsage
- /java.lang:type=Memory/NonHeapMemoryUsage
####measurement = cassandraCache
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hit
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Requests
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Entries
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hit
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Entries
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Size
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Capacity
####measurement = cassandraClient
- /org.apache.cassandra.metrics:type=Client,name=connectedNativeClients
####measurement = cassandraClientRequest
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Timeouts
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures
####measurement = cassandraCommitLog
- /org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks
- /org.apache.cassandra.metrics:type=CommitLog,name=TotalCommitLogSize
####measurement = cassandraCompaction
- /org.apache.cassandra.metrics:type=Compaction,name=CompletedTask
- /org.apache.cassandra.metrics:type=Compaction,name=PendingTasks
- /org.apache.cassandra.metrics:type=Compaction,name=TotalCompactionsCompleted
- /org.apache.cassandra.metrics:type=Compaction,name=BytesCompacted
####measurement = cassandraStorage
- /org.apache.cassandra.metrics:type=Storage,name=Load
- /org.apache.cassandra.metrics:type=Storage,name=Exceptions
####measurement = cassandraTable
Using wildcards for "keyspace" and "scope" can create a lot of series as metrics will be reported for every table and keyspace including internal system tables. Specify a keyspace name and/or a table name to limit them.
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=LiveDiskSpaceUsed
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=TotalDiskSpaceUsed
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=CoordinatorReadLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadTotalLatency
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteTotalLatency
####measurement = cassandraThreadPools
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks
package cassandra
import (
"encoding/json"
"errors"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"io/ioutil"
"net/http"
"net/url"
//"reflect"
"strings"
)
/*type Server struct {
Host string
Username string
Password string
Port string
}*/
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type JolokiaClientImpl struct {
client *http.Client
}
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type Cassandra struct {
jClient JolokiaClient
Context string
Servers []string
Metrics []string
}
type javaMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type cassandraMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type jmxMetric interface {
addTagsFields(out map[string]interface{})
}
func addServerTags(host string, tags map[string]string) {
if host != "" && host != "localhost" && host != "127.0.0.1" {
tags["host"] = host
}
}
func newJavaMetric(host string, metric string,
acc telegraf.Accumulator) *javaMetric {
return &javaMetric{host: host, metric: metric, acc: acc}
}
func newCassandraMetric(host string, metric string,
acc telegraf.Accumulator) *cassandraMetric {
return &cassandraMetric{host: host, metric: metric, acc: acc}
}
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
mname string) {
for k, v := range values {
if v != nil {
fields[mname+"_"+k] = v
}
}
}
func parseJmxMetricRequest(mbean string) map[string]string {
tokens := make(map[string]string)
classAndPairs := strings.Split(mbean, ":")
if classAndPairs[0] == "org.apache.cassandra.metrics" {
tokens["class"] = "cassandra"
} else if classAndPairs[0] == "java.lang" {
tokens["class"] = "java"
} else {
return tokens
}
pairs := strings.Split(classAndPairs[1], ",")
for _, pair := range pairs {
p := strings.Split(pair, "=")
tokens[p[0]] = p[1]
}
return tokens
}
func addTokensToTags(tokens map[string]string, tags map[string]string) {
for k, v := range tokens {
if k == "name" {
tags["mname"] = v // name seems to a reserved word in influxdb
} else if k == "class" || k == "type" {
continue // class and type are used in the metric name
} else {
tags[k] = v
}
}
}
func (j javaMetric) addTagsFields(out map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
a := out["request"].(map[string]interface{})
attribute := a["attribute"].(string)
mbean := a["mbean"].(string)
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
addServerTags(j.host, tags)
if _, ok := tags["mname"]; !ok {
//Queries for a single value will not return a "name" tag in the response.
tags["mname"] = attribute
}
if values, ok := out["value"]; ok {
switch t := values.(type) {
case map[string]interface{}:
addValuesAsFields(values.(map[string]interface{}), fields, attribute)
case interface{}:
fields[attribute] = t
}
j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
j.metric, out)
}
}
func addCassandraMetric(mbean string, c cassandraMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
addServerTags(c.host, tags)
addValuesAsFields(values, fields, tags["mname"])
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
}
func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
r := out["request"]
tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string))
// Requests with wildcards for keyspace or table names will return nested
// maps in the json response
if tokens["type"] == "Table" && (tokens["keyspace"] == "*" ||
tokens["scope"] == "*") {
if valuesMap, ok := out["value"]; ok {
for k, v := range valuesMap.(map[string]interface{}) {
addCassandraMetric(k, c, v.(map[string]interface{}))
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
return
}
} else {
if values, ok := out["value"]; ok {
addCassandraMetric(r.(map[string]interface{})["mbean"].(string),
c, values.(map[string]interface{}))
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
return
}
}
}
func (j *Cassandra) SampleConfig() string {
return `
# This is the context root used to compose the jolokia url
context = "/jolokia/read"
## List of cassandra servers exposing jolokia read service
servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"]
## List of metrics collected on above servers
## Each metric consists of a jmx path.
## This will collect all heap memory usage metrics from the jvm and
## ReadLatency metrics for all keyspaces and tables.
## "type=Table" in the query works with Cassandra3.0. Older versions might
## need to use "type=ColumnFamily"
metrics = [
"/java.lang:type=Memory/HeapMemoryUsage",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
]
`
}
func (j *Cassandra) Description() string {
return "Read Cassandra metrics through Jolokia"
}
func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func parseServerTokens(server string) map[string]string {
serverTokens := make(map[string]string)
hostAndUser := strings.Split(server, "@")
hostPort := ""
userPasswd := ""
if len(hostAndUser) == 2 {
hostPort = hostAndUser[1]
userPasswd = hostAndUser[0]
} else {
hostPort = hostAndUser[0]
}
hostTokens := strings.Split(hostPort, ":")
serverTokens["host"] = hostTokens[0]
serverTokens["port"] = hostTokens[1]
if userPasswd != "" {
userTokens := strings.Split(userPasswd, ":")
serverTokens["user"] = userTokens[0]
serverTokens["passwd"] = userTokens[1]
}
return serverTokens
}
func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
context := c.Context
servers := c.Servers
metrics := c.Metrics
for _, server := range servers {
for _, metric := range metrics {
var m jmxMetric
serverTokens := parseServerTokens(server)
if strings.HasPrefix(metric, "/java.lang:") {
m = newJavaMetric(serverTokens["host"], metric, acc)
} else if strings.HasPrefix(metric,
"/org.apache.cassandra.metrics:") {
m = newCassandraMetric(serverTokens["host"], metric, acc)
}
// Prepare URL
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
serverTokens["port"] + context + metric)
if err != nil {
return err
}
if serverTokens["user"] != "" && serverTokens["passwd"] != "" {
requestUrl.User = url.UserPassword(serverTokens["user"],
serverTokens["passwd"])
}
fmt.Printf("host %s url %s\n", serverTokens["host"], requestUrl)
out, err := c.getAttr(requestUrl)
if out["status"] != 200.0 {
fmt.Printf("URL returned with status %v\n", out["status"])
continue
}
m.addTagsFields(out)
}
}
return nil
}
func init() {
inputs.Add("cassandra", func() telegraf.Input {
return &Cassandra{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}
package cassandra
import (
_ "fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
_ "github.com/stretchr/testify/require"
)
const validJavaMultiValueJSON = `
{
"request":{
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":{
"init":67108864,
"committed":456130560,
"max":477626368,
"used":203288528
},
"timestamp":1446129191,
"status":200
}`
const validCassandraMultiValueJSON = `
{
"request": {
"mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table,type=Table",
"type": "read"},
"status": 200,
"timestamp": 1458089229,
"value": {
"999thPercentile": 20.0,
"99thPercentile": 10.0,
"Count": 400,
"DurationUnit": "microseconds",
"Max": 30.0,
"Mean": null,
"MeanRate": 3.0,
"Min": 1.0,
"RateUnit": "events/second",
"StdDev": null
}
}`
const validCassandraNestedMultiValueJSON = `
{
"request": {
"mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=*,type=Table",
"type": "read"},
"status": 200,
"timestamp": 1458089184,
"value": {
"org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table1,type=Table":
{ "999thPercentile": 1.0,
"Count": 100,
"DurationUnit": "microseconds",
"OneMinuteRate": 1.0,
"RateUnit": "events/second",
"StdDev": null
},
"org.apache.cassandra.metrics:keyspace=test_keyspace2,name=ReadLatency,scope=test_table2,type=Table":
{ "999thPercentile": 2.0,
"Count": 200,
"DurationUnit": "microseconds",
"OneMinuteRate": 2.0,
"RateUnit": "events/second",
"StdDev": null
}
}
}`
const validSingleValueJSON = `
{
"request":{
"path":"used",
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":209274376,
"timestamp":1446129256,
"status":200
}`
const validJavaMultiTypeJSON = `
{
"request":{
"mbean":"java.lang:name=ConcurrentMarkSweep,type=GarbageCollector",
"attribute":"CollectionCount",
"type":"read"
},
"value":1,
"timestamp":1459316570,
"status":200
}`
const invalidJSON = "I don't think this is JSON"
const empty = ""
var Servers = []string{"10.10.10.10:8778"}
var AuthServers = []string{"user:passwd@10.10.10.10:8778"}
var MultipleServers = []string{"10.10.10.10:8778", "10.10.10.11:8778"}
var HeapMetric = "/java.lang:type=Memory/HeapMemoryUsage"
var ReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency"
var NestedReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency"
var GarbageCollectorMetric1 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount"
var GarbageCollectorMetric2 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime"
var Context = "/jolokia/read"
type jolokiaClientStub struct {
responseBody string
statusCode int
}
func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) {
resp := http.Response{}
resp.StatusCode = c.statusCode
resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody))
return &resp, nil
}
// Generates a pointer to an HttpJson object that uses a mock HTTP client.
// Parameters:
// response : Body of the response that the mock HTTP client should return
// statusCode: HTTP status code the mock HTTP client should return
//
// Returns:
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
func genJolokiaClientStub(response string, statusCode int, servers []string, metrics []string) *Cassandra {
return &Cassandra{
jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode},
Context: Context,
Servers: servers,
Metrics: metrics,
}
}
// Test that the proper values are ignored or collected for class=Java
func TestHttpJsonJavaMultiValue(t *testing.T) {
cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200,
MultipleServers, []string{HeapMetric})
var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 2, len(acc.Metrics))
fields := map[string]interface{}{
"HeapMemoryUsage_init": 67108864.0,
"HeapMemoryUsage_committed": 456130560.0,
"HeapMemoryUsage_max": 477626368.0,
"HeapMemoryUsage_used": 203288528.0,
}
tags1 := map[string]string{
"host": "10.10.10.10",
"mname": "HeapMemoryUsage",
}
tags2 := map[string]string{
"host": "10.10.10.11",
"mname": "HeapMemoryUsage",
}
acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags1)
acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags2)
}
func TestHttpJsonJavaMultiType(t *testing.T) {
cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, AuthServers, []string{GarbageCollectorMetric1, GarbageCollectorMetric2})
var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 2, len(acc.Metrics))
fields := map[string]interface{}{
"CollectionCount": 1.0,
}
tags := map[string]string{
"host": "10.10.10.10",
"mname": "ConcurrentMarkSweep",
}
acc.AssertContainsTaggedFields(t, "javaGarbageCollector", fields, tags)
}
// Test that the proper values are ignored or collected
func TestHttpJsonOn404(t *testing.T) {
jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers,
[]string{HeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 0, len(acc.Metrics))
}
// Test that the proper values are ignored or collected for class=Cassandra
func TestHttpJsonCassandraMultiValue(t *testing.T) {
cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []string{ReadLatencyMetric})
var acc testutil.Accumulator
err := cassandra.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Metrics))
fields := map[string]interface{}{
"ReadLatency_999thPercentile": 20.0,
"ReadLatency_99thPercentile": 10.0,
"ReadLatency_Count": 400.0,
"ReadLatency_DurationUnit": "microseconds",
"ReadLatency_Max": 30.0,
"ReadLatency_MeanRate": 3.0,
"ReadLatency_Min": 1.0,
"ReadLatency_RateUnit": "events/second",
}
tags := map[string]string{
"host": "10.10.10.10",
"mname": "ReadLatency",
"keyspace": "test_keyspace1",
"scope": "test_table",
}
acc.AssertContainsTaggedFields(t, "cassandraTable", fields, tags)
}
// Test that the proper values are ignored or collected for class=Cassandra with
// nested values
func TestHttpJsonCassandraNestedMultiValue(t *testing.T) {
cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []string{NestedReadLatencyMetric})
var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 2, len(acc.Metrics))
fields1 := map[string]interface{}{
"ReadLatency_999thPercentile": 1.0,
"ReadLatency_Count": 100.0,
"ReadLatency_DurationUnit": "microseconds",
"ReadLatency_OneMinuteRate": 1.0,
"ReadLatency_RateUnit": "events/second",
}
fields2 := map[string]interface{}{
"ReadLatency_999thPercentile": 2.0,
"ReadLatency_Count": 200.0,
"ReadLatency_DurationUnit": "microseconds",
"ReadLatency_OneMinuteRate": 2.0,
"ReadLatency_RateUnit": "events/second",
}
tags1 := map[string]string{
"host": "10.10.10.10",
"mname": "ReadLatency",
"keyspace": "test_keyspace1",
"scope": "test_table1",
}
tags2 := map[string]string{
"host": "10.10.10.10",
"mname": "ReadLatency",
"keyspace": "test_keyspace2",
"scope": "test_table2",
}
acc.AssertContainsTaggedFields(t, "cassandraTable", fields1, tags1)
acc.AssertContainsTaggedFields(t, "cassandraTable", fields2, tags2)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment