From 194288c00e1c6424264b38d14f2b45769bdc6fb4 Mon Sep 17 00:00:00 2001
From: Victor Garcia <vgarcia@thousandeyes.com>
Date: Wed, 20 Apr 2016 01:16:22 +0200
Subject: [PATCH] Adding replication lag metric

closes #1066
---
 CHANGELOG.md                                |  1 +
 plugins/inputs/mongodb/README.md            |  1 +
 plugins/inputs/mongodb/mongodb_data.go      |  1 +
 plugins/inputs/mongodb/mongodb_data_test.go |  1 +
 plugins/inputs/mongodb/mongodb_server.go    | 18 +++++-
 plugins/inputs/mongodb/mongostat.go         | 66 +++++++++++++++++++--
 6 files changed, 81 insertions(+), 7 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6b09616a..659b318d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -40,6 +40,7 @@ based on _prefix_ in addition to globs. This means that a filter like
 - [#1035](https://github.com/influxdata/telegraf/issues/1035): Add `user`, `exe`, `pidfile` tags to procstat plugin.
 - [#1041](https://github.com/influxdata/telegraf/issues/1041): Add `n_cpus` field to the system plugin.
 - [#1072](https://github.com/influxdata/telegraf/pull/1072): New Input Plugin: filestat.
+- [#1066](https://github.com/influxdata/telegraf/pull/1066): Replication lag metrics for MongoDB input plugin
 
 ### Bugfixes
 
diff --git a/plugins/inputs/mongodb/README.md b/plugins/inputs/mongodb/README.md
index 868b5152..0d140b39 100644
--- a/plugins/inputs/mongodb/README.md
+++ b/plugins/inputs/mongodb/README.md
@@ -50,3 +50,4 @@ and create a single measurement containing values e.g.
  * vsize_megabytes
  * ttl_deletes_per_sec
  * ttl_passes_per_sec
+ * repl_lag
diff --git a/plugins/inputs/mongodb/mongodb_data.go b/plugins/inputs/mongodb/mongodb_data.go
index 2bbeabd9..f9bbc1d3 100644
--- a/plugins/inputs/mongodb/mongodb_data.go
+++ b/plugins/inputs/mongodb/mongodb_data.go
@@ -54,6 +54,7 @@ var DefaultReplStats = map[string]string{
 	"repl_getmores_per_sec": "GetMoreR",
 	"repl_commands_per_sec": "CommandR",
 	"member_status":         "NodeType",
+	"repl_lag":              "ReplLag",
 }
 
 var MmapStats = map[string]string{
diff --git a/plugins/inputs/mongodb/mongodb_data_test.go b/plugins/inputs/mongodb/mongodb_data_test.go
index e63d2025..5619641f 100644
--- a/plugins/inputs/mongodb/mongodb_data_test.go
+++ b/plugins/inputs/mongodb/mongodb_data_test.go
@@ -127,6 +127,7 @@ func TestStateTag(t *testing.T) {
 		"repl_inserts_per_sec":  int64(0),
 		"repl_queries_per_sec":  int64(0),
 		"repl_updates_per_sec":  int64(0),
+		"repl_lag":              int64(0),
 		"resident_megabytes":    int64(0),
 		"updates_per_sec":       int64(0),
 		"vsize_megabytes":       int64(0),
diff --git a/plugins/inputs/mongodb/mongodb_server.go b/plugins/inputs/mongodb/mongodb_server.go
index 26aac2b6..86699a4d 100644
--- a/plugins/inputs/mongodb/mongodb_server.go
+++ b/plugins/inputs/mongodb/mongodb_server.go
@@ -1,6 +1,7 @@
 package mongodb
 
 import (
+	"log"
 	"net/url"
 	"time"
 
@@ -12,7 +13,7 @@ import (
 type Server struct {
 	Url        *url.URL
 	Session    *mgo.Session
-	lastResult *ServerStatus
+	lastResult *MongoStatus
 }
 
 func (s *Server) getDefaultTags() map[string]string {
@@ -24,11 +25,22 @@ func (s *Server) getDefaultTags() map[string]string {
 func (s *Server) gatherData(acc telegraf.Accumulator) error {
 	s.Session.SetMode(mgo.Eventual, true)
 	s.Session.SetSocketTimeout(0)
-	result := &ServerStatus{}
-	err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result)
+	result_server := &ServerStatus{}
+	err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result_server)
 	if err != nil {
 		return err
 	}
+	result_repl := &ReplSetStatus{}
+	err = s.Session.DB("admin").Run(bson.D{{"replSetGetStatus", 1}}, result_repl)
+	if err != nil {
+		log.Println("Not gathering replica set status, member not in replica set")
+	}
+
+	result := &MongoStatus{
+		ServerStatus:  result_server,
+		ReplSetStatus: result_repl,
+	}
+
 	defer func() {
 		s.lastResult = result
 	}()
diff --git a/plugins/inputs/mongodb/mongostat.go b/plugins/inputs/mongodb/mongostat.go
index 33cd8254..b131bf9a 100644
--- a/plugins/inputs/mongodb/mongostat.go
+++ b/plugins/inputs/mongodb/mongostat.go
@@ -11,6 +11,8 @@ import (
 	"sort"
 	"strings"
 	"time"
+
+	"gopkg.in/mgo.v2/bson"
 )
 
 const (
@@ -28,8 +30,13 @@ const (
 	WTOnly               // only active if node has wiredtiger-specific fields
 )
 
+type MongoStatus struct {
+	SampleTime    time.Time
+	ServerStatus  *ServerStatus
+	ReplSetStatus *ReplSetStatus
+}
+
 type ServerStatus struct {
-	SampleTime         time.Time              `bson:""`
 	Host               string                 `bson:"host"`
 	Version            string                 `bson:"version"`
 	Process            string                 `bson:"process"`
@@ -57,6 +64,19 @@ type ServerStatus struct {
 	Metrics            *MetricsStats          `bson:"metrics"`
 }
 
+// ReplSetStatus stores information from replSetGetStatus
+type ReplSetStatus struct {
+	Members []ReplSetMember `bson:"members"`
+	MyState int64           `bson:"myState"`
+}
+
+// ReplSetMember stores information related to a replica set member
+type ReplSetMember struct {
+	Name   string               `bson:"name"`
+	State  int64                `bson:"state"`
+	Optime *bson.MongoTimestamp `bson:"optime"`
+}
+
 // WiredTiger stores information related to the WiredTiger storage engine.
 type WiredTiger struct {
 	Transaction TransactionStats       `bson:"transaction"`
@@ -356,6 +376,7 @@ type StatLine struct {
 
 	// Replicated Opcounter fields
 	InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
+	ReplLag                                               int64
 	Flushes                                               int64
 	Mapped, Virtual, Resident, NonMapped                  int64
 	Faults                                                int64
@@ -410,8 +431,11 @@ func diff(newVal, oldVal, sampleTime int64) int64 {
 	return d / sampleTime
 }
 
-// NewStatLine constructs a StatLine object from two ServerStatus objects.
-func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs int64) *StatLine {
+// NewStatLine constructs a StatLine object from two MongoStatus objects.
+func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSecs int64) *StatLine {
+	oldStat := *oldMongo.ServerStatus
+	newStat := *newMongo.ServerStatus
+
 	returnVal := &StatLine{
 		Key:       key,
 		Host:      newStat.Host,
@@ -462,7 +486,7 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs
 		returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes
 	}
 
-	returnVal.Time = newStat.SampleTime
+	returnVal.Time = newMongo.SampleTime
 	returnVal.IsMongos =
 		(newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess))
 
@@ -607,5 +631,39 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs
 		returnVal.NumConnections = newStat.Connections.Current
 	}
 
+	newReplStat := *newMongo.ReplSetStatus
+
+	if newReplStat.Members != nil {
+		myName := newStat.Repl.Me
+		// Find the master and myself
+		master := ReplSetMember{}
+		me := ReplSetMember{}
+		for _, member := range newReplStat.Members {
+			if member.Name == myName {
+				if member.State == 1 {
+					// I'm the master
+					returnVal.ReplLag = 0
+					break
+				} else {
+					// I'm secondary
+					me = member
+				}
+			} else if member.State == 1 {
+				// Master found
+				master = member
+			}
+		}
+
+		if me.Optime != nil && master.Optime != nil && me.State == 2 {
+			// MongoTimestamp type is int64 where the first 32bits are the unix timestamp
+			lag := int64(*master.Optime>>32 - *me.Optime>>32)
+			if lag < 0 {
+				returnVal.ReplLag = 0
+			} else {
+				returnVal.ReplLag = lag
+			}
+		}
+	}
+
 	return returnVal
 }
-- 
GitLab