diff --git a/plugins/inputs/mysql/README.md b/plugins/inputs/mysql/README.md
index 34bb07bef5024c3c29f6377af746e9deb106b337..a55ddb8ff041f6e546817ceb5758b42743302b46 100644
--- a/plugins/inputs/mysql/README.md
+++ b/plugins/inputs/mysql/README.md
@@ -7,6 +7,7 @@ This plugin gathers the statistic data from MySQL server
 * Slave statuses
 * Binlog size
 * Process list
+* User Statistics
 * Info schema auto increment columns
 * Table I/O waits
 * Index I/O waits
@@ -44,6 +45,9 @@ This plugin gathers the statistic data from MySQL server
   ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
   gather_process_list                       = true
   #
+  ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
+  gather_user_statistics                    = true
+  #
   ## gather auto_increment columns and max values from information schema
   gather_info_schema_auto_inc               = true
   #
@@ -89,6 +93,30 @@ Requires to be turned on in configuration.
     * binary_files_count(int, number)
 * Process list - connection metrics from processlist for each user. It has the following tags
     * connections(int, number)
+* User Statistics - connection metrics from user statistics for each user. It has the following fields
+    * access_denied
+    * binlog_bytes_written
+    * busy_time
+    * bytes_received
+    * bytes_sent
+    * commit_transactions
+    * concurrent_connections
+    * connected_time
+    * cpu_time
+    * denied_connections
+    * empty_queries
+    * hostlost_connections
+    * other_commands
+    * rollback_transactions
+    * rows_fetched
+    * rows_updated
+    * select_commands
+    * server
+    * table_rows_read
+    * total_connections
+    * total_ssl_connections
+    * update_commands
+    * user
 * Perf Table IO waits - total count and time of I/O waits event for each table
 and process. It has following fields:
     * table_io_waits_total_fetch(float, number)
@@ -158,6 +186,8 @@ The unit of fields varies by the tags.
     * server (the host name from which the metrics are gathered)
 * Process list measurement has following tags
     * user (username for whom the metrics are gathered)
+* User Statistics measurement has following tags
+    * user (username for whom the metrics are gathered)
 * Perf table IO waits measurement has following tags
     * schema
     * name (object name for event or process)
diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go
index 54f296586bc06f6d75a0267d5f10132bee92056d..c0a31aeed75555b3232a14121a529cb576fee433 100644
--- a/plugins/inputs/mysql/mysql.go
+++ b/plugins/inputs/mysql/mysql.go
@@ -23,6 +23,7 @@ type Mysql struct {
 	PerfEventsStatementsTimeLimit       int64    `toml:"perf_events_statemetns_time_limit"`
 	TableSchemaDatabases                []string `toml:"table_schema_databases"`
 	GatherProcessList                   bool     `toml:"gather_process_list"`
+	GatherUserStatistics                bool     `toml:"gather_user_statistics"`
 	GatherInfoSchemaAutoInc             bool     `toml:"gather_info_schema_auto_inc"`
 	GatherSlaveStatus                   bool     `toml:"gather_slave_status"`
 	GatherBinaryLogs                    bool     `toml:"gather_binary_logs"`
@@ -60,6 +61,9 @@ var sampleConfig = `
   ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
   gather_process_list                       = true
   #
+  ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
+  gather_user_statistics                    = true
+  #
   ## gather auto_increment columns and max values from information schema
   gather_info_schema_auto_inc               = true
   #
@@ -415,6 +419,10 @@ const (
         WHERE ID != connection_id()
         GROUP BY command,state
         ORDER BY null`
+	infoSchemaUserStatisticsQuery = `
+        SELECT *,count(*)
+        FROM information_schema.user_statistics
+	GROUP BY user`
 	infoSchemaAutoIncQuery = `
         SELECT table_schema, table_name, column_name, auto_increment,
           CAST(pow(2, case data_type
@@ -530,7 +538,6 @@ const (
 			table_name
 			FROM information_schema.tables
 		WHERE table_schema = 'performance_schema' AND table_name = ?
-
 	`
 )
 
@@ -582,6 +589,13 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
 		}
 	}
 
+	if m.GatherUserStatistics {
+		err = m.GatherUserStatisticsStatuses(db, serv, acc)
+		if err != nil {
+			return err
+		}
+	}
+
 	if m.GatherSlaveStatus {
 		err = m.gatherSlaveStatuses(db, serv, acc)
 		if err != nil {
@@ -669,6 +683,11 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
 			return err
 		}
 		key = strings.ToLower(key)
+                // parse mysql version and put into field and tag
+		if strings.Contains(key, "version") {
+			fields[key] = string(val)
+			tags[key] = string(val)
+		}
 		// parse value, if it is numeric then save, otherwise ignore
 		if floatVal, ok := parseValue(val); ok {
 			fields[key] = floatVal
@@ -854,6 +873,12 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
 				return err
 			}
 			fields["syncs"] = i
+		case "Uptime":
+			i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
+			if err != nil {
+				return err
+			}
+			fields["uptime"] = i
 		}
 	}
 	// Send any remaining fields
@@ -884,6 +909,74 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
 		}
 	}
 
+	// gather connection metrics from user_statistics for each user
+	if m.GatherUserStatistics {
+		conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
+
+		for conn_rows.Next() {
+			var user string
+			var total_connections int64
+			var concurrent_connections int64
+			var connected_time int64
+			var busy_time int64
+			var cpu_time int64
+			var bytes_received int64
+			var bytes_sent int64
+			var binlog_bytes_written int64
+			var rows_fetched int64
+			var rows_updated int64
+			var table_rows_read int64
+			var select_commands int64
+			var update_commands int64
+			var other_commands int64
+			var commit_transactions int64
+			var rollback_transactions int64
+			var denied_connections int64
+			var lost_connections int64
+			var access_denied int64
+			var empty_queries int64
+			var total_ssl_connections int64
+
+			err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
+				&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
+				&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
+				&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
+				&empty_queries, &total_ssl_connections,
+			)
+
+			if err != nil {
+				return err
+			}
+
+			tags := map[string]string{"server": servtag, "user": user}
+			fields := map[string]interface{}{
+			  "total_connections": total_connections,
+			  "concurrent_connections": concurrent_connections,
+			  "connected_time": connected_time,
+			  "busy_time": busy_time,
+			  "cpu_time": cpu_time,
+			  "bytes_received": bytes_received,
+			  "bytes_sent": bytes_sent,
+			  "binlog_bytes_written": binlog_bytes_written,
+			  "rows_fetched": rows_fetched,
+			  "rows_updated": rows_updated,
+			  "table_rows_read": table_rows_read,
+			  "select_commands": select_commands,
+			  "update_commands": update_commands,
+			  "other_commands": other_commands,
+			  "commit_transactions": commit_transactions,
+			  "rollback_transactions": rollback_transactions,
+			  "denied_connections": denied_connections,
+			  "lost_connections": lost_connections,
+			  "access_denied": access_denied,
+			  "empty_queries": empty_queries,
+			  "total_ssl_connections": total_ssl_connections,
+                        }
+
+			acc.AddFields("mysql_user_stats", fields, tags)
+		}
+	}
+
 	return nil
 }
 
@@ -932,6 +1025,88 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
 	return nil
 }
 
+// GatherUserStatistics can be used to collect metrics on each running command
+// and its state with its running count
+func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
+	// run query
+	rows, err := db.Query(infoSchemaUserStatisticsQuery)
+	if err != nil {
+		return err
+	}
+	defer rows.Close()
+	var (
+		user                   string
+		total_connections      int64
+		concurrent_connections int64
+		connected_time         int64
+		busy_time              int64
+		cpu_time               int64
+		bytes_received         int64
+		bytes_sent             int64
+		binlog_bytes_written   int64
+		rows_fetched           int64
+		rows_updated           int64
+		table_rows_read        int64
+		select_commands        int64
+		update_commands        int64
+		other_commands         int64
+		commit_transactions    int64
+		rollback_transactions  int64
+		denied_connections     int64
+		lost_connections       int64
+		access_denied          int64
+		empty_queries          int64
+		total_ssl_connections  int64
+		count                  uint32
+	)
+
+	var servtag string
+	servtag, err = parseDSN(serv)
+	if err != nil {
+		servtag = "localhost"
+	}
+
+	for rows.Next() {
+		err = rows.Scan(&user, &total_connections, &concurrent_connections,
+			&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
+			&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
+			&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
+			&empty_queries, &total_ssl_connections, &count,
+		)
+		if err != nil {
+			return err
+		}
+
+		tags := map[string]string{"server": servtag, "user": user}
+		fields := map[string]interface{}{
+
+			"total_connections":      total_connections,
+			"concurrent_connections": concurrent_connections,
+			"connected_time":         connected_time,
+			"busy_time":              busy_time,
+			"cpu_time":               cpu_time,
+			"bytes_received":         bytes_received,
+			"bytes_sent":             bytes_sent,
+			"binlog_bytes_written":   binlog_bytes_written,
+			"rows_fetched":           rows_fetched,
+			"rows_updated":           rows_updated,
+			"table_rows_read":        table_rows_read,
+			"select_commands":        select_commands,
+			"update_commands":        update_commands,
+			"other_commands":         other_commands,
+			"commit_transactions":    commit_transactions,
+			"rollback_transactions":  rollback_transactions,
+			"denied_connections":     denied_connections,
+			"lost_connections":       lost_connections,
+			"access_denied":          access_denied,
+			"empty_queries":          empty_queries,
+			"total_ssl_connections":  total_ssl_connections,
+		}
+		acc.AddFields("mysql_user_stats", fields, tags)
+	}
+	return nil
+}
+
 // gatherPerfTableIOWaits can be used to get total count and time
 // of I/O wait event for each table and process
 func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {