Skip to content
Snippets Groups Projects
Commit 48092ed5 authored by Ivan Lopez's avatar Ivan Lopez Committed by Daniel Nelson
Browse files

Add RabbitMQ cluster and running nodes count and running node status (#3703)

parent efb9d5b4
No related branches found
No related tags found
No related merge requests found
...@@ -61,6 +61,8 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd ...@@ -61,6 +61,8 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- messages_ready (int, messages) - messages_ready (int, messages)
- messages_unacked (int, messages) - messages_unacked (int, messages)
- queues (int, queues) - queues (int, queues)
- clustering_listeners (int, cluster nodes)
- amqp_listeners (int, amqp nodes up)
- rabbitmq_node - rabbitmq_node
- disk_free (int, bytes) - disk_free (int, bytes)
...@@ -74,6 +76,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd ...@@ -74,6 +76,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- run_queue (int, erlang processes) - run_queue (int, erlang processes)
- sockets_total (int, sockets) - sockets_total (int, sockets)
- sockets_used (int, sockets) - sockets_used (int, sockets)
- running (int, node up)
- rabbitmq_queue - rabbitmq_queue
- consumer_utilisation (float, percent) - consumer_utilisation (float, percent)
...@@ -144,7 +147,7 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m) ...@@ -144,7 +147,7 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)
``` ```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000 rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000 rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,clustering_listeners=2i,amqp_listeners=1i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000 rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000 rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
``` ```
...@@ -60,6 +60,12 @@ type OverviewResponse struct { ...@@ -60,6 +60,12 @@ type OverviewResponse struct {
MessageStats *MessageStats `json:"message_stats"` MessageStats *MessageStats `json:"message_stats"`
ObjectTotals *ObjectTotals `json:"object_totals"` ObjectTotals *ObjectTotals `json:"object_totals"`
QueueTotals *QueueTotals `json:"queue_totals"` QueueTotals *QueueTotals `json:"queue_totals"`
Listeners []Listeners `json:"listeners"`
}
// Listeners ...
type Listeners struct {
Protocol string `json:"protocol"`
} }
// Details ... // Details ...
...@@ -134,6 +140,7 @@ type Node struct { ...@@ -134,6 +140,7 @@ type Node struct {
RunQueue int64 `json:"run_queue"` RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"` SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"` SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
} }
type Exchange struct { type Exchange struct {
...@@ -186,7 +193,7 @@ var sampleConfig = ` ...@@ -186,7 +193,7 @@ var sampleConfig = `
# queues = ["telegraf"] # queues = ["telegraf"]
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered. ## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"] # exchanges = ["telegraf"]
` `
...@@ -275,11 +282,20 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { ...@@ -275,11 +282,20 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return return
} }
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil { if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil || overview.Listeners == nil {
acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue")) acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue"))
return return
} }
var clustering_listeners, amqp_listeners int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
clustering_listeners++
} else if listener.Protocol == "amqp" {
amqp_listeners++
}
}
tags := map[string]string{"url": r.URL} tags := map[string]string{"url": r.URL}
if r.Name != "" { if r.Name != "" {
tags["name"] = r.Name tags["name"] = r.Name
...@@ -297,6 +313,8 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { ...@@ -297,6 +313,8 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver, "messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet, "messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish, "messages_published": overview.MessageStats.Publish,
"clustering_listeners": clustering_listeners,
"amqp_listeners": amqp_listeners,
} }
acc.AddFields("rabbitmq_overview", fields, tags) acc.AddFields("rabbitmq_overview", fields, tags)
} }
...@@ -319,6 +337,11 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { ...@@ -319,6 +337,11 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
tags := map[string]string{"url": r.URL} tags := map[string]string{"url": r.URL}
tags["node"] = node.Name tags["node"] = node.Name
var running int64 = 0
if node.Running {
running = 1
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"disk_free": node.DiskFree, "disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit, "disk_free_limit": node.DiskFreeLimit,
...@@ -331,6 +354,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { ...@@ -331,6 +354,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
"run_queue": node.RunQueue, "run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal, "sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed, "sockets_used": node.SocketsUsed,
"running": running,
} }
acc.AddFields("rabbitmq_node", fields, tags, now) acc.AddFields("rabbitmq_node", fields, tags, now)
} }
......
...@@ -51,7 +51,25 @@ const sampleOverviewResponse = ` ...@@ -51,7 +51,25 @@ const sampleOverviewResponse = `
"messages_unacknowledged_details": { "messages_unacknowledged_details": {
"rate": 0.0 "rate": 0.0
} }
} },
"listeners": [
{
"name": "rabbit@node-a",
"protocol": "amqp"
},
{
"name": "rabbit@node-b",
"protocol": "amqp"
},
{
"name": "rabbit@node-a",
"protocol": "clustering"
},
{
"name": "rabbit@node-b",
"protocol": "clustering"
}
]
} }
` `
...@@ -514,6 +532,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { ...@@ -514,6 +532,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"consumers", "consumers",
"exchanges", "exchanges",
"queues", "queues",
"clustering_listeners",
"amqp_listeners",
} }
for _, metric := range intMetrics { for _, metric := range intMetrics {
...@@ -532,6 +552,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { ...@@ -532,6 +552,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"run_queue", "run_queue",
"sockets_total", "sockets_total",
"sockets_used", "sockets_used",
"running",
} }
for _, metric := range nodeIntMetrics { for _, metric := range nodeIntMetrics {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment