From a7b0861436ff5fc29f5a23b145482cce47e9e96f Mon Sep 17 00:00:00 2001
From: Jesse Hanley <hanleyja@ornl.gov>
Date: Wed, 27 Apr 2016 15:14:25 -0400
Subject: [PATCH] Adding Jobstats support to Lustre2 input plugin

Lustre Jobstats allows for RPCs to be tagged with a value, such
as a job's ID.  This allows for per job statistics. This plugin
collects statistics and tags the data with the jobid.

closes #1107
---
 CHANGELOG.md                           |   1 +
 plugins/inputs/lustre2/lustre2.go      | 266 ++++++++++++++++++++++++-
 plugins/inputs/lustre2/lustre2_test.go | 118 +++++++++++
 3 files changed, 378 insertions(+), 7 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 306083da..39383be7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -57,6 +57,7 @@ based on _prefix_ in addition to globs. This means that a filter like
 - [#1086](https://github.com/influxdata/telegraf/pull/1086): Ability to specify AWS keys in config file. Thanks @johnrengleman!
 - [#1096](https://github.com/influxdata/telegraf/pull/1096): Performance refactor of running output buffers.
 - [#967](https://github.com/influxdata/telegraf/issues/967): Buffer logging improvements.
+- [#1107](https://github.com/influxdata/telegraf/issues/1107): Support lustre2 job stats. Thanks @hanleyja!
 
 ### Bugfixes
 
diff --git a/plugins/inputs/lustre2/lustre2.go b/plugins/inputs/lustre2/lustre2.go
index 6ac41d39..8ef9223b 100644
--- a/plugins/inputs/lustre2/lustre2.go
+++ b/plugins/inputs/lustre2/lustre2.go
@@ -34,9 +34,13 @@ var sampleConfig = `
   ##
   # ost_procfiles = [
   #   "/proc/fs/lustre/obdfilter/*/stats",
-  #   "/proc/fs/lustre/osd-ldiskfs/*/stats"
+  #   "/proc/fs/lustre/osd-ldiskfs/*/stats",
+  #   "/proc/fs/lustre/obdfilter/*/job_stats",
+  # ]
+  # mds_procfiles = [
+  #   "/proc/fs/lustre/mdt/*/md_stats",
+  #   "/proc/fs/lustre/mdt/*/job_stats",
   # ]
-  # mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]
 `
 
 /* The wanted fields would be a []string if not for the
@@ -82,6 +86,139 @@ var wanted_ost_fields = []*mapping{
 	},
 }
 
+var wanted_ost_jobstats_fields = []*mapping{
+	{ // The read line has several fields, so we need to differentiate what they are
+		inProc:   "read",
+		field:    3,
+		reportAs: "jobstats_read_calls",
+	},
+	{
+		inProc:   "read",
+		field:    7,
+		reportAs: "jobstats_read_min_size",
+	},
+	{
+		inProc:   "read",
+		field:    9,
+		reportAs: "jobstats_read_max_size",
+	},
+	{
+		inProc:   "read",
+		field:    11,
+		reportAs: "jobstats_read_bytes",
+	},
+	{ // Different inProc for newer versions
+		inProc:   "read_bytes",
+		field:    3,
+		reportAs: "jobstats_read_calls",
+	},
+	{
+		inProc:   "read_bytes",
+		field:    7,
+		reportAs: "jobstats_read_min_size",
+	},
+	{
+		inProc:   "read_bytes",
+		field:    9,
+		reportAs: "jobstats_read_max_size",
+	},
+	{
+		inProc:   "read_bytes",
+		field:    11,
+		reportAs: "jobstats_read_bytes",
+	},
+	{ // We need to do the same for the write fields
+		inProc:   "write",
+		field:    3,
+		reportAs: "jobstats_write_calls",
+	},
+	{
+		inProc:   "write",
+		field:    7,
+		reportAs: "jobstats_write_min_size",
+	},
+	{
+		inProc:   "write",
+		field:    9,
+		reportAs: "jobstats_write_max_size",
+	},
+	{
+		inProc:   "write",
+		field:    11,
+		reportAs: "jobstats_write_bytes",
+	},
+	{ // Different inProc for newer versions
+		inProc:   "write_bytes",
+		field:    3,
+		reportAs: "jobstats_write_calls",
+	},
+	{
+		inProc:   "write_bytes",
+		field:    7,
+		reportAs: "jobstats_write_min_size",
+	},
+	{
+		inProc:   "write_bytes",
+		field:    9,
+		reportAs: "jobstats_write_max_size",
+	},
+	{
+		inProc:   "write_bytes",
+		field:    11,
+		reportAs: "jobstats_write_bytes",
+	},
+	{
+		inProc:   "getattr",
+		field:    3,
+		reportAs: "jobstats_ost_getattr",
+	},
+	{
+		inProc:   "setattr",
+		field:    3,
+		reportAs: "jobstats_ost_setattr",
+	},
+	{
+		inProc:   "punch",
+		field:    3,
+		reportAs: "jobstats_punch",
+	},
+	{
+		inProc:   "sync",
+		field:    3,
+		reportAs: "jobstats_ost_sync",
+	},
+	{
+		inProc:   "destroy",
+		field:    3,
+		reportAs: "jobstats_destroy",
+	},
+	{
+		inProc:   "create",
+		field:    3,
+		reportAs: "jobstats_create",
+	},
+	{
+		inProc:   "statfs",
+		field:    3,
+		reportAs: "jobstats_ost_statfs",
+	},
+	{
+		inProc:   "get_info",
+		field:    3,
+		reportAs: "jobstats_get_info",
+	},
+	{
+		inProc:   "set_info",
+		field:    3,
+		reportAs: "jobstats_set_info",
+	},
+	{
+		inProc:   "quotactl",
+		field:    3,
+		reportAs: "jobstats_quotactl",
+	},
+}
+
 var wanted_mds_fields = []*mapping{
 	{
 		inProc: "open",
@@ -133,6 +270,89 @@ var wanted_mds_fields = []*mapping{
 	},
 }
 
+var wanted_mdt_jobstats_fields = []*mapping{
+	{
+		inProc:   "open",
+		field:    3,
+		reportAs: "jobstats_open",
+	},
+	{
+		inProc:   "close",
+		field:    3,
+		reportAs: "jobstats_close",
+	},
+	{
+		inProc:   "mknod",
+		field:    3,
+		reportAs: "jobstats_mknod",
+	},
+	{
+		inProc:   "link",
+		field:    3,
+		reportAs: "jobstats_link",
+	},
+	{
+		inProc:   "unlink",
+		field:    3,
+		reportAs: "jobstats_unlink",
+	},
+	{
+		inProc:   "mkdir",
+		field:    3,
+		reportAs: "jobstats_mkdir",
+	},
+	{
+		inProc:   "rmdir",
+		field:    3,
+		reportAs: "jobstats_rmdir",
+	},
+	{
+		inProc:   "rename",
+		field:    3,
+		reportAs: "jobstats_rename",
+	},
+	{
+		inProc:   "getattr",
+		field:    3,
+		reportAs: "jobstats_getattr",
+	},
+	{
+		inProc:   "setattr",
+		field:    3,
+		reportAs: "jobstats_setattr",
+	},
+	{
+		inProc:   "getxattr",
+		field:    3,
+		reportAs: "jobstats_getxattr",
+	},
+	{
+		inProc:   "setxattr",
+		field:    3,
+		reportAs: "jobstats_setxattr",
+	},
+	{
+		inProc:   "statfs",
+		field:    3,
+		reportAs: "jobstats_statfs",
+	},
+	{
+		inProc:   "sync",
+		field:    3,
+		reportAs: "jobstats_sync",
+	},
+	{
+		inProc:   "samedir_rename",
+		field:    3,
+		reportAs: "jobstats_samedir_rename",
+	},
+	{
+		inProc:   "crossdir_rename",
+		field:    3,
+		reportAs: "jobstats_crossdir_rename",
+	},
+}
+
 func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc telegraf.Accumulator) error {
 	files, err := filepath.Glob(fileglob)
 	if err != nil {
@@ -143,7 +363,7 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
 		/* Turn /proc/fs/lustre/obdfilter/<ost_name>/stats and similar
 		 * into just the object store target name
 		 * Assumpion: the target name is always second to last,
-		 * which is true in Lustre 2.1->2.5
+		 * which is true in Lustre 2.1->2.8
 		 */
 		path := strings.Split(file, "/")
 		name := path[len(path)-2]
@@ -161,16 +381,21 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
 
 		for _, line := range lines {
 			parts := strings.Fields(line)
+			if strings.HasPrefix(line, "- job_id:") {
+				// Set the job_id explicitly if present
+				fields["jobid"] = parts[2]
+			}
+
 			for _, wanted := range wanted_fields {
 				var data uint64
-				if parts[0] == wanted.inProc {
+				if strings.TrimSuffix(parts[0], ":") == wanted.inProc {
 					wanted_field := wanted.field
 					// if not set, assume field[1]. Shouldn't be field[0], as
 					// that's a string
 					if wanted_field == 0 {
 						wanted_field = 1
 					}
-					data, err = strconv.ParseUint((parts[wanted_field]), 10, 64)
+					data, err = strconv.ParseUint(strings.TrimSuffix((parts[wanted_field]), ","), 10, 64)
 					if err != nil {
 						return err
 					}
@@ -213,6 +438,12 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
 		if err != nil {
 			return err
 		}
+		// per job statistics are in obdfilter/<ost_name>/job_stats
+		err = l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/job_stats",
+			wanted_ost_jobstats_fields, acc)
+		if err != nil {
+			return err
+		}
 	}
 
 	if len(l.Mds_procfiles) == 0 {
@@ -222,16 +453,31 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
 		if err != nil {
 			return err
 		}
+
+		// Metadata target job stats
+		err = l.GetLustreProcStats("/proc/fs/lustre/mdt/*/job_stats",
+			wanted_mdt_jobstats_fields, acc)
+		if err != nil {
+			return err
+		}
 	}
 
 	for _, procfile := range l.Ost_procfiles {
-		err := l.GetLustreProcStats(procfile, wanted_ost_fields, acc)
+		ost_fields := wanted_ost_fields
+		if strings.HasSuffix(procfile, "job_stats") {
+			ost_fields = wanted_ost_jobstats_fields
+		}
+		err := l.GetLustreProcStats(procfile, ost_fields, acc)
 		if err != nil {
 			return err
 		}
 	}
 	for _, procfile := range l.Mds_procfiles {
-		err := l.GetLustreProcStats(procfile, wanted_mds_fields, acc)
+		mdt_fields := wanted_mds_fields
+		if strings.HasSuffix(procfile, "job_stats") {
+			mdt_fields = wanted_mdt_jobstats_fields
+		}
+		err := l.GetLustreProcStats(procfile, mdt_fields, acc)
 		if err != nil {
 			return err
 		}
@@ -241,6 +487,12 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
 		tags := map[string]string{
 			"name": name,
 		}
+		if _, ok := fields["jobid"]; ok {
+			if jobid, ok := fields["jobid"].(string); ok {
+				tags["jobid"] = jobid
+			}
+			delete(fields, "jobid")
+		}
 		acc.AddFields("lustre2", fields, tags)
 	}
 
diff --git a/plugins/inputs/lustre2/lustre2_test.go b/plugins/inputs/lustre2/lustre2_test.go
index 9e560df2..5cc9c0e4 100644
--- a/plugins/inputs/lustre2/lustre2_test.go
+++ b/plugins/inputs/lustre2/lustre2_test.go
@@ -38,6 +38,23 @@ cache_hit                 7393729777 samples [pages] 1 1 7393729777
 cache_miss                11653333250 samples [pages] 1 1 11653333250
 `
 
+const obdfilterJobStatsContents = `job_stats:
+- job_id:          testjob1
+  snapshot_time:   1461772761
+  read_bytes:      { samples:           1, unit: bytes, min:    4096, max:    4096, sum:            4096 }
+  write_bytes:     { samples:          25, unit: bytes, min: 1048576, max: 1048576, sum:        26214400 }
+  getattr:         { samples:           0, unit:  reqs }
+  setattr:         { samples:           0, unit:  reqs }
+  punch:           { samples:           1, unit:  reqs }
+  sync:            { samples:           0, unit:  reqs }
+  destroy:         { samples:           0, unit:  reqs }
+  create:          { samples:           0, unit:  reqs }
+  statfs:          { samples:           0, unit:  reqs }
+  get_info:        { samples:           0, unit:  reqs }
+  set_info:        { samples:           0, unit:  reqs }
+  quotactl:        { samples:           0, unit:  reqs }
+`
+
 const mdtProcContents = `snapshot_time             1438693238.20113 secs.usecs
 open                      1024577037 samples [reqs]
 close                     873243496 samples [reqs]
@@ -57,6 +74,27 @@ samedir_rename            259625 samples [reqs]
 crossdir_rename           369571 samples [reqs]
 `
 
+const mdtJobStatsContents = `job_stats:
+- job_id:          testjob1
+  snapshot_time:   1461772761
+  open:            { samples:           5, unit:  reqs }
+  close:           { samples:           4, unit:  reqs }
+  mknod:           { samples:           6, unit:  reqs }
+  link:            { samples:           8, unit:  reqs }
+  unlink:          { samples:          90, unit:  reqs }
+  mkdir:           { samples:         521, unit:  reqs }
+  rmdir:           { samples:         520, unit:  reqs }
+  rename:          { samples:           9, unit:  reqs }
+  getattr:         { samples:          11, unit:  reqs }
+  setattr:         { samples:           1, unit:  reqs }
+  getxattr:        { samples:           3, unit:  reqs }
+  setxattr:        { samples:           4, unit:  reqs }
+  statfs:          { samples:        1205, unit:  reqs }
+  sync:            { samples:           2, unit:  reqs }
+  samedir_rename:  { samples:         705, unit:  reqs }
+  crossdir_rename: { samples:         200, unit:  reqs }
+`
+
 func TestLustre2GeneratesMetrics(t *testing.T) {
 
 	tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
@@ -83,6 +121,7 @@ func TestLustre2GeneratesMetrics(t *testing.T) {
 	err = ioutil.WriteFile(obddir+"/"+ost_name+"/stats", []byte(obdfilterProcContents), 0644)
 	require.NoError(t, err)
 
+	// Begin by testing standard Lustre stats
 	m := &Lustre2{
 		Ost_procfiles: []string{obddir + "/*/stats", osddir + "/*/stats"},
 		Mds_procfiles: []string{mdtdir + "/*/md_stats"},
@@ -128,3 +167,82 @@ func TestLustre2GeneratesMetrics(t *testing.T) {
 	err = os.RemoveAll(os.TempDir() + "/telegraf")
 	require.NoError(t, err)
 }
+
+func TestLustre2GeneratesJobstatsMetrics(t *testing.T) {
+
+	tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
+	ost_name := "OST0001"
+	job_name := "testjob1"
+
+	mdtdir := tempdir + "/mdt/"
+	err := os.MkdirAll(mdtdir+"/"+ost_name, 0755)
+	require.NoError(t, err)
+
+	obddir := tempdir + "/obdfilter/"
+	err = os.MkdirAll(obddir+"/"+ost_name, 0755)
+	require.NoError(t, err)
+
+	err = ioutil.WriteFile(mdtdir+"/"+ost_name+"/job_stats", []byte(mdtJobStatsContents), 0644)
+	require.NoError(t, err)
+
+	err = ioutil.WriteFile(obddir+"/"+ost_name+"/job_stats", []byte(obdfilterJobStatsContents), 0644)
+	require.NoError(t, err)
+
+	// Test Lustre Jobstats
+	m := &Lustre2{
+		Ost_procfiles: []string{obddir + "/*/job_stats"},
+		Mds_procfiles: []string{mdtdir + "/*/job_stats"},
+	}
+
+	var acc testutil.Accumulator
+
+	err = m.Gather(&acc)
+	require.NoError(t, err)
+
+	tags := map[string]string{
+		"name":  ost_name,
+		"jobid": job_name,
+	}
+
+	fields := map[string]interface{}{
+		"jobstats_read_calls":      uint64(1),
+		"jobstats_read_min_size":   uint64(4096),
+		"jobstats_read_max_size":   uint64(4096),
+		"jobstats_read_bytes":      uint64(4096),
+		"jobstats_write_calls":     uint64(25),
+		"jobstats_write_min_size":  uint64(1048576),
+		"jobstats_write_max_size":  uint64(1048576),
+		"jobstats_write_bytes":     uint64(26214400),
+		"jobstats_ost_getattr":     uint64(0),
+		"jobstats_ost_setattr":     uint64(0),
+		"jobstats_punch":           uint64(1),
+		"jobstats_ost_sync":        uint64(0),
+		"jobstats_destroy":         uint64(0),
+		"jobstats_create":          uint64(0),
+		"jobstats_ost_statfs":      uint64(0),
+		"jobstats_get_info":        uint64(0),
+		"jobstats_set_info":        uint64(0),
+		"jobstats_quotactl":        uint64(0),
+		"jobstats_open":            uint64(5),
+		"jobstats_close":           uint64(4),
+		"jobstats_mknod":           uint64(6),
+		"jobstats_link":            uint64(8),
+		"jobstats_unlink":          uint64(90),
+		"jobstats_mkdir":           uint64(521),
+		"jobstats_rmdir":           uint64(520),
+		"jobstats_rename":          uint64(9),
+		"jobstats_getattr":         uint64(11),
+		"jobstats_setattr":         uint64(1),
+		"jobstats_getxattr":        uint64(3),
+		"jobstats_setxattr":        uint64(4),
+		"jobstats_statfs":          uint64(1205),
+		"jobstats_sync":            uint64(2),
+		"jobstats_samedir_rename":  uint64(705),
+		"jobstats_crossdir_rename": uint64(200),
+	}
+
+	acc.AssertContainsTaggedFields(t, "lustre2", fields, tags)
+
+	err = os.RemoveAll(os.TempDir() + "/telegraf")
+	require.NoError(t, err)
+}
-- 
GitLab