From 82448a9dd13632acf2539b8b4a0ea51786c55016 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel@wavesofdawn.com>
Date: Mon, 2 Apr 2018 13:10:43 -0700
Subject: [PATCH] Add metric_version option to mysql input (#3954)

---
 CHANGELOG.md                     |  22 ++--
 plugins/inputs/mysql/README.md   |  93 ++++++++++++++-
 plugins/inputs/mysql/mysql.go    | 158 ++++++++++++++++++++++---
 plugins/inputs/mysql/v1/mysql.go | 195 +++++++++++++++++++++++++++++++
 4 files changed, 435 insertions(+), 33 deletions(-)
 create mode 100644 plugins/inputs/mysql/v1/mysql.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index f27a03fe..51d9faaa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,15 +2,16 @@
 
 ### Release Notes
 
-- The `mysql` input plugin has been updated to convert values to the
-  correct data type.  This may cause a `field type error` when inserting into
-  InfluxDB due the change of types.  It is recommended to drop the `mysql`,
-  `mysql_variables`, and `mysql_innodb`:
-  ```
-  DROP MEASUREMENT mysql
-  DROP MEASUREMENT mysql_variables
-  DROP MEASUREMENT mysql_innodb
-  ```
+- The `mysql` input plugin has been updated fix a number of type convertion
+  issues.  This may cause a `field type error` when inserting into InfluxDB due
+  the change of types.
+
+  To address this we have introduced a new `metric_version` option to control
+  enabling the new format.  For in depth recommendations on upgrading please
+  reference the [mysql plugin documentation](./plugins/inputs/mysql/README.md#metric-version).
+
+  It is encouraged to migrate to the new model when possible as the old version
+  is deprecated and will be removed in a future version.
 
 - The `postgresql` plugins now defaults to using a persistent connection to the database.
   In environments where TCP connections are terminated the `max_lifetime`
@@ -26,7 +27,8 @@
   is set.  It is encouraged to enable this option when possible as the old
   ordering is deprecated.
 
-- The `httpjson` is now deprecated, please migrate to the new `http` input.
+- The new `http` input configured with `data_format = "json"` can perform the
+  same task as the, now deprecated, `httpjson` input.
 
 
 ### New Inputs
diff --git a/plugins/inputs/mysql/README.md b/plugins/inputs/mysql/README.md
index fb9a18ea..588e7c07 100644
--- a/plugins/inputs/mysql/README.md
+++ b/plugins/inputs/mysql/README.md
@@ -1,4 +1,4 @@
-# MySQL Input plugin
+# MySQL Input Plugin
 
 This plugin gathers the statistic data from MySQL server
 
@@ -18,9 +18,9 @@ This plugin gathers the statistic data from MySQL server
 * File events statistics
 * Table schema statistics
 
-## Configuration
+### Configuration
 
-```
+```toml
 # Read metrics from one or many mysql servers
 [[inputs.mysql]]
   ## specify servers via a url matching:
@@ -81,14 +81,97 @@ This plugin gathers the statistic data from MySQL server
   #
   ## Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)
   interval_slow                             = "30m"
-  
+
   ## Optional SSL Config (will be used if tls=custom parameter specified in server uri)
   ssl_ca = "/etc/telegraf/ca.pem"
   ssl_cert = "/etc/telegraf/cert.pem"
   ssl_key = "/etc/telegraf/key.pem"
 ```
 
-## Measurements & Fields
+#### Metric Version
+
+When `metric_version = 2`, a variety of field type issues are corrected as well
+as naming inconsistencies.  If you have existing data on the original version
+enabling this feature will cause a `field type error` when inserted into
+InfluxDB due to the change of types.  For this reason, you should keep the
+`metric_version` unset until you are ready to migrate to the new format.
+
+If preserving your old data is not required you may wish to drop conflicting
+measurements:
+```
+DROP SERIES from mysql
+DROP SERIES from mysql_variables
+DROP SERIES from mysql_innodb
+```
+
+Otherwise, migration can be performed using the following steps:
+
+1. Duplicate your `mysql` plugin configuration and add a `name_suffix` and
+`metric_version = 2`, this will result in collection using both the old and new
+style concurrently:
+   ```toml
+   [[inputs.mysql]]
+     servers = ["tcp(127.0.0.1:3306)/"]
+
+   [[inputs.mysql]]
+     name_override = "_2"
+     metric_version = 2
+
+     servers = ["tcp(127.0.0.1:3306)/"]
+   ```
+
+2. Upgrade all affected Telegraf clients to version >=1.6.
+
+   New measurements will be created with the `name_suffix`, for example::
+   - `mysql_v2`
+   - `mysql_variables_v2`
+
+3. Update charts, alerts, and other supporting code to the new format.
+4. You can now remove the old `mysql` plugin configuration and remove old
+   measurements.
+
+If you wish to remove the `name_suffix` you may use Kapacitor to copy the
+historical data to the default name.  Do this only after retiring the old
+measurement name.
+
+1. Use the techinique described above to write to multiple locations:
+   ```toml
+   [[inputs.mysql]]
+     servers = ["tcp(127.0.0.1:3306)/"]
+     metric_version = 2
+
+   [[inputs.mysql]]
+     name_override = "_2"
+     metric_version = 2
+
+     servers = ["tcp(127.0.0.1:3306)/"]
+   ```
+2. Create a TICKScript to copy the historical data:
+   ```
+   dbrp "telegraf"."autogen"
+
+   batch
+       |query('''
+           SELECT * FROM "telegraf"."autogen"."mysql_v2"
+       ''')
+           .period(5m)
+           .every(5m)
+           |influxDBOut()
+                   .database('telegraf')
+                   .retentionPolicy('autogen')
+                   .measurement('mysql')
+   ```
+3. Define a task for your script:
+   ```sh
+   kapacitor define copy-measurement -tick copy-measurement.task
+   ```
+4. Run the task over the data you would like to migrate:
+   ```sh
+   kapacitor replay-live batch -start 2018-03-30T20:00:00Z -stop 2018-04-01T12:00:00Z -rec-time -task copy-measurement
+   ```
+5. Verify copied data and repeat for other measurements.
+
+### Metrics:
 * Global statuses - all numeric and boolean values of `SHOW GLOBAL STATUSES`
 * Global variables - all numeric and boolean values of `SHOW GLOBAL VARIABLES`
 * Slave status - metrics from `SHOW SLAVE STATUS` the metrics are gathered when
diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go
index d15f4610..a2be5848 100644
--- a/plugins/inputs/mysql/mysql.go
+++ b/plugins/inputs/mysql/mysql.go
@@ -13,6 +13,7 @@ import (
 	"github.com/influxdata/telegraf"
 	"github.com/influxdata/telegraf/internal"
 	"github.com/influxdata/telegraf/plugins/inputs"
+	"github.com/influxdata/telegraf/plugins/inputs/mysql/v1"
 
 	"github.com/go-sql-driver/mysql"
 )
@@ -40,6 +41,7 @@ type Mysql struct {
 	SSLCA                               string   `toml:"ssl_ca"`
 	SSLCert                             string   `toml:"ssl_cert"`
 	SSLKey                              string   `toml:"ssl_key"`
+	MetricVersion                       int      `toml:"metric_version"`
 }
 
 var sampleConfig = `
@@ -52,6 +54,20 @@ var sampleConfig = `
   #
   ## If no servers are specified, then localhost is used as the host.
   servers = ["tcp(127.0.0.1:3306)/"]
+
+  ## Selects the metric output format.
+  ##
+  ## This option exists to maintain backwards compatibility, if you have
+  ## existing metrics do not set or change this value until you are ready to
+  ## migrate to the new format.
+  ##
+  ## If you do not have existing metrics from this plugin set to the latest
+  ## version.
+  ##
+  ## Telegraf >=1.6: metric_version = 2
+  ##           <1.6: metric_version = 1 (or unset)
+  metric_version = 2
+
   ## the limits for metrics form perf_events_statements
   perf_events_statements_digest_text_limit  = 120
   perf_events_statements_limit              = 250
@@ -541,7 +557,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
 			fields[key] = string(val)
 			tags[key] = string(val)
 		}
-		if value, ok := parseValue(val); ok {
+		if value, ok := m.parseValue(val); ok {
 			fields[key] = value
 		}
 		// Send 20 fields at a time
@@ -593,7 +609,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
 		// range over columns, and try to parse values
 		for i, col := range cols {
 			col = strings.ToLower(col)
-			if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
+			if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok {
 				fields["slave_"+col] = value
 			}
 		}
@@ -662,10 +678,75 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
 			return err
 		}
 
-		key = strings.ToLower(key)
+		if m.MetricVersion < 2 {
+			var found bool
+			for _, mapped := range v1.Mappings {
+				if strings.HasPrefix(key, mapped.OnServer) {
+					// convert numeric values to integer
+					i, _ := strconv.Atoi(string(val))
+					fields[mapped.InExport+key[len(mapped.OnServer):]] = i
+					found = true
+				}
+			}
+			// Send 20 fields at a time
+			if len(fields) >= 20 {
+				acc.AddFields("mysql", fields, tags)
+				fields = make(map[string]interface{})
+			}
+			if found {
+				continue
+			}
 
-		if value, ok := parseValue(val); ok {
-			fields[key] = value
+			// search for specific values
+			switch key {
+			case "Queries":
+				i, err := strconv.ParseInt(string(val), 10, 64)
+				if err != nil {
+					acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
+				} else {
+					fields["queries"] = i
+				}
+			case "Questions":
+				i, err := strconv.ParseInt(string(val), 10, 64)
+				if err != nil {
+					acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
+				} else {
+					fields["questions"] = i
+				}
+			case "Slow_queries":
+				i, err := strconv.ParseInt(string(val), 10, 64)
+				if err != nil {
+					acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
+				} else {
+					fields["slow_queries"] = i
+				}
+			case "Connections":
+				i, err := strconv.ParseInt(string(val), 10, 64)
+				if err != nil {
+					acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
+				} else {
+					fields["connections"] = i
+				}
+			case "Syncs":
+				i, err := strconv.ParseInt(string(val), 10, 64)
+				if err != nil {
+					acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
+				} else {
+					fields["syncs"] = i
+				}
+			case "Uptime":
+				i, err := strconv.ParseInt(string(val), 10, 64)
+				if err != nil {
+					acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
+				} else {
+					fields["uptime"] = i
+				}
+			}
+		} else {
+			key = strings.ToLower(key)
+			if value, ok := m.parseValue(val); ok {
+				fields[key] = value
+			}
 		}
 
 		// Send 20 fields at a time
@@ -820,7 +901,11 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
 	for s, c := range stateCounts {
 		fields[newNamespace("threads", s)] = c
 	}
-	acc.AddFields("mysql_process_list", fields, tags)
+	if m.MetricVersion < 2 {
+		acc.AddFields("mysql_info_schema", fields, tags)
+	} else {
+		acc.AddFields("mysql_process_list", fields, tags)
+	}
 	return nil
 }
 
@@ -1033,7 +1118,11 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
 		fields["auto_increment_column"] = incValue
 		fields["auto_increment_column_max"] = maxInt
 
-		acc.AddFields("mysql_table_schema", fields, tags)
+		if m.MetricVersion < 2 {
+			acc.AddFields("mysql_info_schema", fields, tags)
+		} else {
+			acc.AddFields("mysql_table_schema", fields, tags)
+		}
 	}
 	return nil
 }
@@ -1059,7 +1148,7 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
 			return err
 		}
 		key = strings.ToLower(key)
-		if value, ok := parseValue(val); ok {
+		if value, ok := m.parseValue(val); ok {
 			fields[key] = value
 		}
 		// Send 20 fields at a time
@@ -1430,17 +1519,37 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
 			tags["schema"] = tableSchema
 			tags["table"] = tableName
 
-			acc.AddFields("mysql_table_schema",
-				map[string]interface{}{"rows": tableRows}, tags)
+			if m.MetricVersion < 2 {
+				acc.AddFields(newNamespace("info_schema", "table_rows"),
+					map[string]interface{}{"value": tableRows}, tags)
 
-			acc.AddFields("mysql_table_schema",
-				map[string]interface{}{"data_length": dataLength}, tags)
+				dlTags := copyTags(tags)
+				dlTags["component"] = "data_length"
+				acc.AddFields(newNamespace("info_schema", "table_size", "data_length"),
+					map[string]interface{}{"value": dataLength}, dlTags)
 
-			acc.AddFields("mysql_table_schema",
-				map[string]interface{}{"index_length": indexLength}, tags)
+				ilTags := copyTags(tags)
+				ilTags["component"] = "index_length"
+				acc.AddFields(newNamespace("info_schema", "table_size", "index_length"),
+					map[string]interface{}{"value": indexLength}, ilTags)
 
-			acc.AddFields("mysql_table_schema",
-				map[string]interface{}{"data_free": dataFree}, tags)
+				dfTags := copyTags(tags)
+				dfTags["component"] = "data_free"
+				acc.AddFields(newNamespace("info_schema", "table_size", "data_free"),
+					map[string]interface{}{"value": dataFree}, dfTags)
+			} else {
+				acc.AddFields("mysql_table_schema",
+					map[string]interface{}{"rows": tableRows}, tags)
+
+				acc.AddFields("mysql_table_schema",
+					map[string]interface{}{"data_length": dataLength}, tags)
+
+				acc.AddFields("mysql_table_schema",
+					map[string]interface{}{"index_length": indexLength}, tags)
+
+				acc.AddFields("mysql_table_schema",
+					map[string]interface{}{"data_free": dataFree}, tags)
+			}
 
 			versionTags := copyTags(tags)
 			versionTags["type"] = tableType
@@ -1448,13 +1557,26 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
 			versionTags["row_format"] = rowFormat
 			versionTags["create_options"] = createOptions
 
-			acc.AddFields("mysql_table_schema_version",
-				map[string]interface{}{"table_version": version}, versionTags)
+			if m.MetricVersion < 2 {
+				acc.AddFields(newNamespace("info_schema", "table_version"),
+					map[string]interface{}{"value": version}, versionTags)
+			} else {
+				acc.AddFields("mysql_table_schema_version",
+					map[string]interface{}{"table_version": version}, versionTags)
+			}
 		}
 	}
 	return nil
 }
 
+func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
+	if m.MetricVersion < 2 {
+		return v1.ParseValue(value)
+	} else {
+		return parseValue(value)
+	}
+}
+
 // parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
 func parseValue(value sql.RawBytes) (interface{}, bool) {
 	if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {
diff --git a/plugins/inputs/mysql/v1/mysql.go b/plugins/inputs/mysql/v1/mysql.go
new file mode 100644
index 00000000..6f6062d1
--- /dev/null
+++ b/plugins/inputs/mysql/v1/mysql.go
@@ -0,0 +1,195 @@
+package v1
+
+import (
+	"bytes"
+	"database/sql"
+	"strconv"
+)
+
+type Mapping struct {
+	OnServer string
+	InExport string
+}
+
+var Mappings = []*Mapping{
+	{
+		OnServer: "Aborted_",
+		InExport: "aborted_",
+	},
+	{
+		OnServer: "Bytes_",
+		InExport: "bytes_",
+	},
+	{
+		OnServer: "Com_",
+		InExport: "commands_",
+	},
+	{
+		OnServer: "Created_",
+		InExport: "created_",
+	},
+	{
+		OnServer: "Handler_",
+		InExport: "handler_",
+	},
+	{
+		OnServer: "Innodb_",
+		InExport: "innodb_",
+	},
+	{
+		OnServer: "Key_",
+		InExport: "key_",
+	},
+	{
+		OnServer: "Open_",
+		InExport: "open_",
+	},
+	{
+		OnServer: "Opened_",
+		InExport: "opened_",
+	},
+	{
+		OnServer: "Qcache_",
+		InExport: "qcache_",
+	},
+	{
+		OnServer: "Table_",
+		InExport: "table_",
+	},
+	{
+		OnServer: "Tokudb_",
+		InExport: "tokudb_",
+	},
+	{
+		OnServer: "Threads_",
+		InExport: "threads_",
+	},
+	{
+		OnServer: "Access_",
+		InExport: "access_",
+	},
+	{
+		OnServer: "Aria__",
+		InExport: "aria_",
+	},
+	{
+		OnServer: "Binlog__",
+		InExport: "binlog_",
+	},
+	{
+		OnServer: "Busy_",
+		InExport: "busy_",
+	},
+	{
+		OnServer: "Connection_",
+		InExport: "connection_",
+	},
+	{
+		OnServer: "Delayed_",
+		InExport: "delayed_",
+	},
+	{
+		OnServer: "Empty_",
+		InExport: "empty_",
+	},
+	{
+		OnServer: "Executed_",
+		InExport: "executed_",
+	},
+	{
+		OnServer: "Executed_",
+		InExport: "executed_",
+	},
+	{
+		OnServer: "Feature_",
+		InExport: "feature_",
+	},
+	{
+		OnServer: "Flush_",
+		InExport: "flush_",
+	},
+	{
+		OnServer: "Last_",
+		InExport: "last_",
+	},
+	{
+		OnServer: "Master_",
+		InExport: "master_",
+	},
+	{
+		OnServer: "Max_",
+		InExport: "max_",
+	},
+	{
+		OnServer: "Memory_",
+		InExport: "memory_",
+	},
+	{
+		OnServer: "Not_",
+		InExport: "not_",
+	},
+	{
+		OnServer: "Performance_",
+		InExport: "performance_",
+	},
+	{
+		OnServer: "Prepared_",
+		InExport: "prepared_",
+	},
+	{
+		OnServer: "Rows_",
+		InExport: "rows_",
+	},
+	{
+		OnServer: "Rpl_",
+		InExport: "rpl_",
+	},
+	{
+		OnServer: "Select_",
+		InExport: "select_",
+	},
+	{
+		OnServer: "Slave_",
+		InExport: "slave_",
+	},
+	{
+		OnServer: "Slow_",
+		InExport: "slow_",
+	},
+	{
+		OnServer: "Sort_",
+		InExport: "sort_",
+	},
+	{
+		OnServer: "Subquery_",
+		InExport: "subquery_",
+	},
+	{
+		OnServer: "Tc_",
+		InExport: "tc_",
+	},
+	{
+		OnServer: "Threadpool_",
+		InExport: "threadpool_",
+	},
+	{
+		OnServer: "wsrep_",
+		InExport: "wsrep_",
+	},
+	{
+		OnServer: "Uptime_",
+		InExport: "uptime_",
+	},
+}
+
+func ParseValue(value sql.RawBytes) (float64, bool) {
+	if bytes.Compare(value, []byte("Yes")) == 0 || bytes.Compare(value, []byte("ON")) == 0 {
+		return 1, true
+	}
+
+	if bytes.Compare(value, []byte("No")) == 0 || bytes.Compare(value, []byte("OFF")) == 0 {
+		return 0, true
+	}
+	n, err := strconv.ParseFloat(string(value), 64)
+	return n, err == nil
+}
-- 
GitLab