Skip to content
Snippets Groups Projects
Commit c4ea122d authored by Rene Zbinden's avatar Rene Zbinden Committed by Cameron Sparr
Browse files

add sysstat plugin

parent 90185dc6
No related branches found
No related tags found
No related merge requests found
......@@ -52,6 +52,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"
_ "github.com/influxdata/telegraf/plugins/inputs/system"
_ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/trig"
......
This diff is collapsed.
// build +linux
package sysstat
import (
"bufio"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
var (
execCommand = exec.Command // execCommand is used to mock commands in tests.
dfltActivities = []string{"DISK"}
)
const parseInterval = 1 // parseInterval is the interval (in seconds) where the parsing takes place.
type Sysstat struct {
// Interval that defines how long data is collected by Sadc cmd.
//
// This value has to be the same as the thelegraf collection interval.
Interval int `toml:"collect_interval"`
// Sadc represents the path to the sadc collector utility.
Sadc string `toml:"sadc_path"`
// Sadf represents the path to the sadf cmd.
Sadf string `toml:"sadf_path"`
// Activities is a list of activities that are passed as argument to the
// collector utility (e.g: DISK, SNMP etc...)
// The more activities that are added, the more data is collected.
Activities []string
// Options is a map of options.
//
// The key represents the actual option that the Sadf command is called with and
// the value represents the description for that option.
//
// For example, if you have the following options map:
// map[string]string{"-C": "cpu", "-d": "disk"}
// The Sadf command is run with the options -C and -d to extract cpu and
// disk metrics from the collected binary file.
//
// If Group is false (see below), each metric will be prefixed with the corresponding description
// and represents itself a measurement.
//
// If Group is true, metrics are grouped to a single measurement with the corresponding description as name.
Options map[string]string
// Group determines if metrics are grouped or not.
Group bool
// DeviceTags adds the possibility to add additional tags for devices.
DeviceTags map[string][]map[string]string `toml:"device_tags"`
tmpFile string
}
func (*Sysstat) Description() string {
return "Sysstat metrics collector"
}
var sampleConfig = `
## Collect interval in seconds. This value has to be equal
## to the telegraf collect interval.
collect_interval = 5 # required
#
#
## Path to the sadc command.
sadc_path = "/usr/lib/sa/sadc" # required
#
#
## Path to the sadf command, if it is not in PATH
# sadf_path = "/usr/bin/sadf"
#
#
## Activities is a list of activities, that are passed as argument to the
## sadc collector utility (e.g: DISK, SNMP etc...)
## The more activities that are added, the more data is collected.
# activities = ["DISK"]
#
#
## Group metrics to measurements.
##
## If group is false each metric will be prefixed with a description
## and represents itself a measurement.
##
## If Group is true, corresponding metrics are grouped to a single measurement.
# group = false
#
#
## Options for the sasf command. The values on the left represent the sadf options and
## the values on the right their description (wich are used for grouping and prefixing metrics).
[inputs.sysstat.options]
-C = "cpu"
-B = "paging"
-b = "io"
-d = "disk" # requires DISK activity
-H = "hugepages"
"-I ALL" = "interrupts" # requires INT activity
"-n ALL" = "network"
"-P ALL" = "per_cpu"
-q = "queue"
-R = "mem"
"-r ALL" = "mem_util"
-S = "swap_util"
-u = "cpu_util"
-v = "inode"
-W = "swap"
-w = "task"
#
#
## Device tags can be used to add additional tags for devices. For example the configuration below
## adds a tag vg with value rootvg for all metrics with sda devices.
# [[inputs.sysstat.device_tags.sda]]
# vg = "rootvg"
`
func (*Sysstat) SampleConfig() string {
return sampleConfig
}
func (s *Sysstat) Gather(acc telegraf.Accumulator) error {
ts := time.Now().Add(time.Duration(s.Interval) * time.Second)
if err := s.collect(); err != nil {
return err
}
var wg sync.WaitGroup
errorChannel := make(chan error, len(s.Options)*2)
for option := range s.Options {
wg.Add(1)
go func(acc telegraf.Accumulator, option string) {
defer wg.Done()
if err := s.parse(acc, option, ts); err != nil {
errorChannel <- err
}
}(acc, option)
}
wg.Wait()
close(errorChannel)
errorStrings := []string{}
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
if _, err := os.Stat(s.tmpFile); err == nil {
if err := os.Remove(s.tmpFile); err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n"))
}
// collect collects sysstat data with the collector utility sadc. It runs the following command:
// Sadc -S <Activity1> -S <Activity2> ... <collectInterval> 2 tmpFile
// The above command collects system metrics during <collectInterval> and saves it in binary form to tmpFile.
func (s *Sysstat) collect() error {
if len(s.Activities) == 0 {
s.Activities = dfltActivities
}
options := []string{}
for _, act := range s.Activities {
options = append(options, "-S", act)
}
s.tmpFile = path.Join("/tmp", fmt.Sprintf("sysstat-%d", time.Now().Unix()))
collectInterval := s.Interval - parseInterval // collectInterval has to be smaller than the telegraf data collection interval
options = append(options, strconv.Itoa(collectInterval), "2", s.tmpFile)
cmd := execCommand(s.Sadc, options...)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to run command %s: %s", strings.Join(cmd.Args, " "), string(out))
}
return nil
}
// parse runs Sadf on the previously saved tmpFile:
// Sadf -p -- -p <option> tmpFile
// and parses the output to add it to the telegraf.Accumulator acc.
func (s *Sysstat) parse(acc telegraf.Accumulator, option string, ts time.Time) error {
if len(s.Sadf) == 0 {
sadf, err := exec.LookPath("sadf")
if err != nil {
return errors.New("sadf not in $PATH, configure path to sadf")
}
s.Sadf = sadf
}
cmd := execCommand(s.Sadf, s.sadfOptions(option)...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("running command '%s' failed: %s", strings.Join(cmd.Args, " "), err)
}
r := bufio.NewReader(stdout)
csv := csv.NewReader(r)
csv.Comma = '\t'
csv.FieldsPerRecord = 6
var measurement string
// groupData to accumulate data when Group=true
type groupData struct {
tags map[string]string
fields map[string]interface{}
}
m := make(map[string]groupData)
for {
record, err := csv.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
device := record[3]
value, err := strconv.ParseFloat(record[5], 64)
if err != nil {
return err
}
tags := map[string]string{}
if device != "-" {
tags["device"] = device
if addTags, ok := s.DeviceTags[device]; ok {
for _, tag := range addTags {
for k, v := range tag {
tags[k] = v
}
}
}
}
if s.Group {
measurement = s.Options[option]
if _, ok := m[device]; !ok {
m[device] = groupData{
fields: make(map[string]interface{}),
tags: make(map[string]string),
}
}
g, _ := m[device]
if len(g.tags) == 0 {
for k, v := range tags {
g.tags[k] = v
}
}
g.fields[escape(record[4])] = value
} else {
measurement = s.Options[option] + "_" + escape(record[4])
fields := map[string]interface{}{
"value": value,
}
acc.AddFields(measurement, fields, tags, ts)
}
}
if s.Group {
for _, v := range m {
acc.AddFields(measurement, v.fields, v.tags, ts)
}
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("command %s failed with %s", strings.Join(cmd.Args, " "), err)
}
return nil
}
// sadfOptions creates the correct options for the sadf utility.
func (s *Sysstat) sadfOptions(activityOption string) []string {
options := []string{
"-p",
"--",
"-p",
}
opts := strings.Split(activityOption, " ")
options = append(options, opts...)
options = append(options, s.tmpFile)
return options
}
// escape removes % and / chars in field names
func escape(dirty string) string {
var fieldEscaper = strings.NewReplacer(
`%`, "pct_",
`/`, "_per_",
)
return fieldEscaper.Replace(dirty)
}
func init() {
inputs.Add("sysstat", func() telegraf.Input {
return &Sysstat{}
})
}
// build +linux
package sysstat
import (
"fmt"
"os"
"os/exec"
"path"
"testing"
"github.com/influxdata/telegraf/testutil"
)
var s = Sysstat{
Interval: 10,
Sadc: "/usr/lib/sa/sadc",
Group: false,
Activities: []string{"DISK", "SNMP"},
Options: map[string]string{
"C": "cpu",
"d": "disk",
},
DeviceTags: map[string][]map[string]string{
"sda": {
{
"vg": "rootvg",
},
},
},
}
func TestGather(t *testing.T) {
// overwriting exec commands with mock commands
execCommand = fakeExecCommand
defer func() { execCommand = exec.Command }()
var acc testutil.Accumulator
err := s.Gather(&acc)
if err != nil {
t.Fatal(err)
}
cpuTags := map[string]string{"device": "all"}
diskTags := map[string]string{"device": "sda", "vg": "rootvg"}
tests := []struct {
measurement string
fields map[string]interface{}
tags map[string]string
}{
{
"cpu_pct_user",
map[string]interface{}{
"value": 0.65,
},
cpuTags,
},
{
"cpu_pct_nice",
map[string]interface{}{
"value": 0.0,
},
cpuTags,
},
{
"cpu_pct_system",
map[string]interface{}{
"value": 0.10,
},
cpuTags,
},
{
"cpu_pct_iowait",
map[string]interface{}{
"value": 0.15,
},
cpuTags,
},
{
"cpu_pct_steal",
map[string]interface{}{
"value": 0.0,
},
cpuTags,
},
{
"cpu_pct_idle",
map[string]interface{}{
"value": 99.1,
},
cpuTags,
},
{
"disk_tps",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
{
"disk_rd_sec_per_s",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
{
"disk_wr_sec_per_s",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
{
"disk_avgrq-sz",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
{
"disk_avgqu-sz",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
{
"disk_await",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
{
"disk_svctm",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
{
"disk_pct_util",
map[string]interface{}{
"value": 0.00,
},
diskTags,
},
}
for _, test := range tests {
acc.AssertContainsTaggedFields(t, test.measurement, test.fields, test.tags)
}
}
func TestGatherGrouped(t *testing.T) {
s.Group = true
// overwriting exec commands with mock commands
execCommand = fakeExecCommand
defer func() { execCommand = exec.Command }()
var acc testutil.Accumulator
err := s.Gather(&acc)
if err != nil {
t.Fatal(err)
}
var tests = []struct {
measurement string
fields map[string]interface{}
tags map[string]string
}{
{
"cpu",
map[string]interface{}{
"pct_user": 0.65,
"pct_nice": 0.0,
"pct_system": 0.10,
"pct_iowait": 0.15,
"pct_steal": 0.0,
"pct_idle": 99.1,
},
map[string]string{"device": "all"},
},
{
"disk",
map[string]interface{}{
"tps": 0.00,
"rd_sec_per_s": 0.00,
"wr_sec_per_s": 0.00,
"avgrq-sz": 0.00,
"avgqu-sz": 0.00,
"await": 0.00,
"svctm": 0.00,
"pct_util": 0.00,
},
map[string]string{"device": "sda", "vg": "rootvg"},
},
{
"disk",
map[string]interface{}{
"tps": 2.01,
"rd_sec_per_s": 1.0,
"wr_sec_per_s": 0.00,
"avgrq-sz": 0.30,
"avgqu-sz": 0.60,
"await": 0.70,
"svctm": 0.20,
"pct_util": 0.30,
},
map[string]string{"device": "sdb"},
},
}
for _, test := range tests {
acc.AssertContainsTaggedFields(t, test.measurement, test.fields, test.tags)
}
}
func TestEscape(t *testing.T) {
var tests = []struct {
input string
escaped string
}{
{
"%util",
"pct_util",
},
{
"bread/s",
"bread_per_s",
},
{
"%nice",
"pct_nice",
},
}
for _, test := range tests {
if test.escaped != escape(test.input) {
t.Errorf("wrong escape, got %s, wanted %s", escape(test.input), test.escaped)
}
}
}
// Helper function that mock the exec.Command call (and call the test binary)
func fakeExecCommand(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}
// TestHelperProcess isn't a real test. It's used to mock exec.Command
// For example, if you run:
// GO_WANT_HELPER_PROCESS=1 go test -test.run=TestHelperProcess -- sadf -p -- -p -C tmpFile
// it returns mockData["C"] output.
func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}
mockData := map[string]string{
"C": `dell-xps 5 2016-03-25 16:18:10 UTC all %user 0.65
dell-xps 5 2016-03-25 16:18:10 UTC all %nice 0.00
dell-xps 5 2016-03-25 16:18:10 UTC all %system 0.10
dell-xps 5 2016-03-25 16:18:10 UTC all %iowait 0.15
dell-xps 5 2016-03-25 16:18:10 UTC all %steal 0.00
dell-xps 5 2016-03-25 16:18:10 UTC all %idle 99.10
`,
"d": `dell-xps 5 2016-03-25 16:18:10 UTC sda tps 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sda rd_sec/s 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sda wr_sec/s 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sda avgrq-sz 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sda avgqu-sz 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sda await 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sda svctm 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sda %util 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sdb tps 2.01
dell-xps 5 2016-03-25 16:18:10 UTC sdb rd_sec/s 1.00
dell-xps 5 2016-03-25 16:18:10 UTC sdb wr_sec/s 0.00
dell-xps 5 2016-03-25 16:18:10 UTC sdb avgrq-sz 0.30
dell-xps 5 2016-03-25 16:18:10 UTC sdb avgqu-sz 0.60
dell-xps 5 2016-03-25 16:18:10 UTC sdb await 0.70
dell-xps 5 2016-03-25 16:18:10 UTC sdb svctm 0.20
dell-xps 5 2016-03-25 16:18:10 UTC sdb %util 0.30
`,
}
args := os.Args
// Previous arguments are tests stuff, that looks like :
// /tmp/go-build970079519/…/_test/integration.test -test.run=TestHelperProcess --
cmd, args := args[3], args[4:]
// Handle the case where args[0] is dir:...
switch path.Base(cmd) {
case "sadf":
fmt.Fprint(os.Stdout, mockData[args[3]])
default:
}
// some code here to check arguments perhaps?
os.Exit(0)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment