From df9c7590b3865aa8facada3c5243d5e5e9c5a53d Mon Sep 17 00:00:00 2001
From: Mike Danko <mike@l4m3.com>
Date: Tue, 12 Dec 2017 16:22:11 -0500
Subject: [PATCH] Fix various mysql data type conversions (#3554)

---
 plugins/inputs/mysql/mysql.go      | 321 ++++-------------------------
 plugins/inputs/mysql/mysql_test.go |  21 +-
 2 files changed, 54 insertions(+), 288 deletions(-)

diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go
index c3dc3842..1e75bf6b 100644
--- a/plugins/inputs/mysql/mysql.go
+++ b/plugins/inputs/mysql/mysql.go
@@ -169,182 +169,6 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error {
 	return nil
 }
 
-type mapping struct {
-	onServer string
-	inExport string
-}
-
-var mappings = []*mapping{
-	{
-		onServer: "Aborted_",
-		inExport: "aborted_",
-	},
-	{
-		onServer: "Bytes_",
-		inExport: "bytes_",
-	},
-	{
-		onServer: "Com_",
-		inExport: "commands_",
-	},
-	{
-		onServer: "Created_",
-		inExport: "created_",
-	},
-	{
-		onServer: "Handler_",
-		inExport: "handler_",
-	},
-	{
-		onServer: "Innodb_",
-		inExport: "innodb_",
-	},
-	{
-		onServer: "Key_",
-		inExport: "key_",
-	},
-	{
-		onServer: "Open_",
-		inExport: "open_",
-	},
-	{
-		onServer: "Opened_",
-		inExport: "opened_",
-	},
-	{
-		onServer: "Qcache_",
-		inExport: "qcache_",
-	},
-	{
-		onServer: "Table_",
-		inExport: "table_",
-	},
-	{
-		onServer: "Tokudb_",
-		inExport: "tokudb_",
-	},
-	{
-		onServer: "Threads_",
-		inExport: "threads_",
-	},
-	{
-		onServer: "Access_",
-		inExport: "access_",
-	},
-	{
-		onServer: "Aria__",
-		inExport: "aria_",
-	},
-	{
-		onServer: "Binlog__",
-		inExport: "binlog_",
-	},
-	{
-		onServer: "Busy_",
-		inExport: "busy_",
-	},
-	{
-		onServer: "Connection_",
-		inExport: "connection_",
-	},
-	{
-		onServer: "Delayed_",
-		inExport: "delayed_",
-	},
-	{
-		onServer: "Empty_",
-		inExport: "empty_",
-	},
-	{
-		onServer: "Executed_",
-		inExport: "executed_",
-	},
-	{
-		onServer: "Executed_",
-		inExport: "executed_",
-	},
-	{
-		onServer: "Feature_",
-		inExport: "feature_",
-	},
-	{
-		onServer: "Flush_",
-		inExport: "flush_",
-	},
-	{
-		onServer: "Last_",
-		inExport: "last_",
-	},
-	{
-		onServer: "Master_",
-		inExport: "master_",
-	},
-	{
-		onServer: "Max_",
-		inExport: "max_",
-	},
-	{
-		onServer: "Memory_",
-		inExport: "memory_",
-	},
-	{
-		onServer: "Not_",
-		inExport: "not_",
-	},
-	{
-		onServer: "Performance_",
-		inExport: "performance_",
-	},
-	{
-		onServer: "Prepared_",
-		inExport: "prepared_",
-	},
-	{
-		onServer: "Rows_",
-		inExport: "rows_",
-	},
-	{
-		onServer: "Rpl_",
-		inExport: "rpl_",
-	},
-	{
-		onServer: "Select_",
-		inExport: "select_",
-	},
-	{
-		onServer: "Slave_",
-		inExport: "slave_",
-	},
-	{
-		onServer: "Slow_",
-		inExport: "slow_",
-	},
-	{
-		onServer: "Sort_",
-		inExport: "sort_",
-	},
-	{
-		onServer: "Subquery_",
-		inExport: "subquery_",
-	},
-	{
-		onServer: "Tc_",
-		inExport: "tc_",
-	},
-	{
-		onServer: "Threadpool_",
-		inExport: "threadpool_",
-	},
-	{
-		onServer: "wsrep_",
-		inExport: "wsrep_",
-	},
-	{
-		onServer: "Uptime_",
-		inExport: "uptime_",
-	},
-}
-
 var (
 	// status counter
 	generalThreadStates = map[string]uint32{
@@ -717,9 +541,8 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
 			fields[key] = string(val)
 			tags[key] = string(val)
 		}
-		// parse value, if it is numeric then save, otherwise ignore
-		if floatVal, ok := parseValue(val); ok {
-			fields[key] = floatVal
+		if value, ok := parseValue(val); ok {
+			fields[key] = value
 		}
 		// Send 20 fields at a time
 		if len(fields) >= 20 {
@@ -769,7 +592,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
 		}
 		// range over columns, and try to parse values
 		for i, col := range cols {
-			// skip unparsable values
+			col = strings.ToLower(col)
 			if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
 				fields["slave_"+col] = value
 			}
@@ -820,98 +643,36 @@ func (m *Mysql) gatherBinaryLogs(db *sql.DB, serv string, acc telegraf.Accumulat
 // the mappings of actual names and names of each status to be exported
 // to output is provided on mappings variable
 func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
-	// If user forgot the '/', add it
-	if strings.HasSuffix(serv, ")") {
-		serv = serv + "/"
-	} else if serv == "localhost" {
-		serv = ""
-	}
-
 	// run query
 	rows, err := db.Query(globalStatusQuery)
 	if err != nil {
 		return err
 	}
+	defer rows.Close()
 
 	// parse the DSN and save host name as a tag
 	servtag := getDSNTag(serv)
 	tags := map[string]string{"server": servtag}
 	fields := make(map[string]interface{})
 	for rows.Next() {
-		var name string
-		var val interface{}
+		var key string
+		var val sql.RawBytes
 
-		err = rows.Scan(&name, &val)
-		if err != nil {
+		if err = rows.Scan(&key, &val); err != nil {
 			return err
 		}
 
-		var found bool
+		key = strings.ToLower(key)
 
-		// iterate over mappings and gather metrics that is provided on mapping
-		for _, mapped := range mappings {
-			if strings.HasPrefix(name, mapped.onServer) {
-				// convert numeric values to integer
-				i, _ := strconv.Atoi(string(val.([]byte)))
-				fields[mapped.inExport+name[len(mapped.onServer):]] = i
-				found = true
-			}
+		if value, ok := parseValue(val); ok {
+			fields[key] = value
 		}
+
 		// Send 20 fields at a time
 		if len(fields) >= 20 {
 			acc.AddFields("mysql", fields, tags)
 			fields = make(map[string]interface{})
 		}
-
-		if found {
-			continue
-		}
-
-		// search for specific values
-		switch name {
-		case "Queries":
-			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
-			if err != nil {
-				acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
-			} else {
-				fields["queries"] = i
-			}
-		case "Questions":
-			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
-			if err != nil {
-				acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
-			} else {
-				fields["questions"] = i
-			}
-		case "Slow_queries":
-			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
-			if err != nil {
-				acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
-			} else {
-				fields["slow_queries"] = i
-			}
-		case "Connections":
-			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
-			if err != nil {
-				acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
-			} else {
-				fields["connections"] = i
-			}
-		case "Syncs":
-			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
-			if err != nil {
-				acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
-			} else {
-				fields["syncs"] = i
-			}
-		case "Uptime":
-			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
-			if err != nil {
-				acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", name, err))
-			} else {
-				fields["uptime"] = i
-			}
-		}
 	}
 	// Send any remaining fields
 	if len(fields) > 0 {
@@ -1059,7 +820,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
 	for s, c := range stateCounts {
 		fields[newNamespace("threads", s)] = c
 	}
-	acc.AddFields("mysql_info_schema", fields, tags)
+	acc.AddFields("mysql_process_list", fields, tags)
 	return nil
 }
 
@@ -1272,7 +1033,7 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
 		fields["auto_increment_column"] = incValue
 		fields["auto_increment_column_max"] = maxInt
 
-		acc.AddFields("mysql_info_schema", fields, tags)
+		acc.AddFields("mysql_table_schema", fields, tags)
 	}
 	return nil
 }
@@ -1287,21 +1048,19 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
 	}
 	defer rows.Close()
 
-	var key string
-	var val sql.RawBytes
-
 	// parse DSN and save server tag
 	servtag := getDSNTag(serv)
 	tags := map[string]string{"server": servtag}
 	fields := make(map[string]interface{})
 	for rows.Next() {
+		var key string
+		var val sql.RawBytes
 		if err := rows.Scan(&key, &val); err != nil {
 			return err
 		}
 		key = strings.ToLower(key)
-		// parse value, if it is numeric then save, otherwise ignore
-		if floatVal, ok := parseValue(val); ok {
-			fields[key] = floatVal
+		if value, ok := parseValue(val); ok {
+			fields[key] = value
 		}
 		// Send 20 fields at a time
 		if len(fields) >= 20 {
@@ -1671,23 +1430,17 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
 			tags["schema"] = tableSchema
 			tags["table"] = tableName
 
-			acc.AddFields(newNamespace("info_schema", "table_rows"),
-				map[string]interface{}{"value": tableRows}, tags)
+			acc.AddFields("mysql_table_schema",
+				map[string]interface{}{"rows": tableRows}, tags)
 
-			dlTags := copyTags(tags)
-			dlTags["component"] = "data_length"
-			acc.AddFields(newNamespace("info_schema", "table_size", "data_length"),
-				map[string]interface{}{"value": dataLength}, dlTags)
+			acc.AddFields("mysql_table_schema",
+				map[string]interface{}{"data_length": dataLength}, tags)
 
-			ilTags := copyTags(tags)
-			ilTags["component"] = "index_length"
-			acc.AddFields(newNamespace("info_schema", "table_size", "index_length"),
-				map[string]interface{}{"value": indexLength}, ilTags)
+			acc.AddFields("mysql_table_schema",
+				map[string]interface{}{"index_length": indexLength}, tags)
 
-			dfTags := copyTags(tags)
-			dfTags["component"] = "data_free"
-			acc.AddFields(newNamespace("info_schema", "table_size", "data_free"),
-				map[string]interface{}{"value": dataFree}, dfTags)
+			acc.AddFields("mysql_table_schema",
+				map[string]interface{}{"data_free": dataFree}, tags)
 
 			versionTags := copyTags(tags)
 			versionTags["type"] = tableType
@@ -1695,24 +1448,34 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
 			versionTags["row_format"] = rowFormat
 			versionTags["create_options"] = createOptions
 
-			acc.AddFields(newNamespace("info_schema", "table_version"),
-				map[string]interface{}{"value": version}, versionTags)
+			acc.AddFields("mysql_table_schema_version",
+				map[string]interface{}{"table_version": version}, versionTags)
 		}
 	}
 	return nil
 }
 
 // parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
-func parseValue(value sql.RawBytes) (float64, bool) {
-	if bytes.Compare(value, []byte("Yes")) == 0 || bytes.Compare(value, []byte("ON")) == 0 {
+func parseValue(value sql.RawBytes) (interface{}, bool) {
+	if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {
 		return 1, true
 	}
 
-	if bytes.Compare(value, []byte("No")) == 0 || bytes.Compare(value, []byte("OFF")) == 0 {
+	if bytes.EqualFold(value, []byte("NO")) || bytes.Compare(value, []byte("OFF")) == 0 {
 		return 0, true
 	}
-	n, err := strconv.ParseFloat(string(value), 64)
-	return n, err == nil
+
+	if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
+		return val, true
+	}
+	if val, err := strconv.ParseFloat(string(value), 64); err == nil {
+		return val, true
+	}
+
+	if len(string(value)) > 0 {
+		return string(value), true
+	}
+	return nil, false
 }
 
 // findThreadState can be used to find thread state by command and plain state
diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go
index 5356e7bd..1820c934 100644
--- a/plugins/inputs/mysql/mysql_test.go
+++ b/plugins/inputs/mysql/mysql_test.go
@@ -127,26 +127,29 @@ func TestMysqlDNSAddTimeout(t *testing.T) {
 		}
 	}
 }
-
 func TestParseValue(t *testing.T) {
 	testCases := []struct {
 		rawByte   sql.RawBytes
-		value     float64
+		output    interface{}
 		boolValue bool
 	}{
-		{sql.RawBytes("Yes"), 1, true},
-		{sql.RawBytes("No"), 0, false},
+		{sql.RawBytes("123"), int64(123), true},
+		{sql.RawBytes("abc"), "abc", true},
+		{sql.RawBytes("10.1"), 10.1, true},
 		{sql.RawBytes("ON"), 1, true},
-		{sql.RawBytes("OFF"), 0, false},
-		{sql.RawBytes("ABC"), 0, false},
+		{sql.RawBytes("OFF"), 0, true},
+		{sql.RawBytes("NO"), 0, true},
+		{sql.RawBytes("YES"), 1, true},
+		{sql.RawBytes("No"), 0, true},
+		{sql.RawBytes("Yes"), 1, true},
+		{sql.RawBytes(""), nil, false},
 	}
 	for _, cases := range testCases {
-		if value, ok := parseValue(cases.rawByte); value != cases.value && ok != cases.boolValue {
-			t.Errorf("want %d with %t, got %d with %t", int(cases.value), cases.boolValue, int(value), ok)
+		if got, ok := parseValue(cases.rawByte); got != cases.output && ok != cases.boolValue {
+			t.Errorf("for %s wanted %t, got %t", string(cases.rawByte), cases.output, got)
 		}
 	}
 }
-
 func TestNewNamespace(t *testing.T) {
 	testCases := []struct {
 		words     []string
-- 
GitLab