From b9ae3d6a57dbcf27aeaaeeea994e818a27f6a9be Mon Sep 17 00:00:00 2001
From: James <james-lawrence@users.noreply.github.com>
Date: Tue, 24 Jan 2017 15:36:36 -0500
Subject: [PATCH] fix postgresql 'name', and 'oid' data types by switching to a
 driver (#1750)

that handles them properly
---
 CHANGELOG.md                                  |  3 +
 Godeps                                        |  2 +-
 plugins/inputs/postgresql/README.md           |  4 +-
 plugins/inputs/postgresql/connect.go          | 99 +++++++++++++++++++
 plugins/inputs/postgresql/postgresql.go       | 14 +--
 plugins/inputs/postgresql/postgresql_test.go  | 31 +++++-
 .../postgresql_extensible.go                  | 23 +++--
 .../postgresql_extensible_test.go             | 25 +++++
 testutil/accumulator.go                       | 38 ++++++-
 9 files changed, 211 insertions(+), 28 deletions(-)
 create mode 100644 plugins/inputs/postgresql/connect.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d321e89b..68d43f2f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -235,8 +235,11 @@ which can be installed via
 evaluated at every flush interval, rather than once at startup. This makes it
 consistent with the behavior of `collection_jitter`.
 
+- postgresql plugins now handle oid and name typed columns seamlessly, previously they were ignored/skipped.
+
 ### Features
 
+- [#1617](https://github.com/influxdata/telegraf/pull/1617): postgresql_extensible now handles name and oid types correctly.
 - [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag.
 - [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio.
 - [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
diff --git a/Godeps b/Godeps
index 885213c9..99606414 100644
--- a/Godeps
+++ b/Godeps
@@ -33,7 +33,6 @@ github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc
 github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a
 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142
 github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720
-github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36
 github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453
 github.com/miekg/dns cce6c130cdb92c752850880fd285bea1d64439dd
 github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
@@ -63,3 +62,4 @@ gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
 gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886
 gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4
+github.com/jackc/pgx bb73d8427902891bbad7b949b9c60b32949d935f
diff --git a/plugins/inputs/postgresql/README.md b/plugins/inputs/postgresql/README.md
index e5e9a896..e309aa80 100644
--- a/plugins/inputs/postgresql/README.md
+++ b/plugins/inputs/postgresql/README.md
@@ -4,8 +4,8 @@ This postgresql plugin provides metrics for your postgres database. It currently
 ```
 pg version      9.2+   9.1   8.3-9.0   8.1-8.2   7.4-8.0(unsupported)
 ---             ---    ---   -------   -------   -------
-datid*           x      x       x         x
-datname*         x      x       x         x
+datid            x      x       x         x
+datname          x      x       x         x
 numbackends      x      x       x         x         x
 xact_commit      x      x       x         x         x
 xact_rollback    x      x       x         x         x
diff --git a/plugins/inputs/postgresql/connect.go b/plugins/inputs/postgresql/connect.go
new file mode 100644
index 00000000..77858cda
--- /dev/null
+++ b/plugins/inputs/postgresql/connect.go
@@ -0,0 +1,99 @@
+package postgresql
+
+import (
+	"database/sql"
+	"fmt"
+	"net"
+	"net/url"
+	"sort"
+	"strings"
+
+	"github.com/jackc/pgx"
+	"github.com/jackc/pgx/stdlib"
+)
+
+// pulled from lib/pq
+// ParseURL no longer needs to be used by clients of this library since supplying a URL as a
+// connection string to sql.Open() is now supported:
+//
+//	sql.Open("postgres", "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full")
+//
+// It remains exported here for backwards-compatibility.
+//
+// ParseURL converts a url to a connection string for driver.Open.
+// Example:
+//
+//	"postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full"
+//
+// converts to:
+//
+//	"user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full"
+//
+// A minimal example:
+//
+//	"postgres://"
+//
+// This will be blank, causing driver.Open to use all of the defaults
+func ParseURL(uri string) (string, error) {
+	u, err := url.Parse(uri)
+	if err != nil {
+		return "", err
+	}
+
+	if u.Scheme != "postgres" && u.Scheme != "postgresql" {
+		return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme)
+	}
+
+	var kvs []string
+	escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`)
+	accrue := func(k, v string) {
+		if v != "" {
+			kvs = append(kvs, k+"="+escaper.Replace(v))
+		}
+	}
+
+	if u.User != nil {
+		v := u.User.Username()
+		accrue("user", v)
+
+		v, _ = u.User.Password()
+		accrue("password", v)
+	}
+
+	if host, port, err := net.SplitHostPort(u.Host); err != nil {
+		accrue("host", u.Host)
+	} else {
+		accrue("host", host)
+		accrue("port", port)
+	}
+
+	if u.Path != "" {
+		accrue("dbname", u.Path[1:])
+	}
+
+	q := u.Query()
+	for k := range q {
+		accrue(k, q.Get(k))
+	}
+
+	sort.Strings(kvs) // Makes testing easier (not a performance concern)
+	return strings.Join(kvs, " "), nil
+}
+
+func Connect(address string) (*sql.DB, error) {
+	if strings.HasPrefix(address, "postgres://") || strings.HasPrefix(address, "postgresql://") {
+		return sql.Open("pgx", address)
+	}
+
+	config, err := pgx.ParseDSN(address)
+	if err != nil {
+		return nil, err
+	}
+
+	pool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: config})
+	if err != nil {
+		return nil, err
+	}
+
+	return stdlib.OpenFromConnPool(pool)
+}
diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go
index 0e7cdb50..7019762e 100644
--- a/plugins/inputs/postgresql/postgresql.go
+++ b/plugins/inputs/postgresql/postgresql.go
@@ -2,7 +2,6 @@ package postgresql
 
 import (
 	"bytes"
-	"database/sql"
 	"fmt"
 	"regexp"
 	"sort"
@@ -10,8 +9,6 @@ import (
 
 	"github.com/influxdata/telegraf"
 	"github.com/influxdata/telegraf/plugins/inputs"
-
-	"github.com/lib/pq"
 )
 
 type Postgresql struct {
@@ -23,7 +20,7 @@ type Postgresql struct {
 	sanitizedAddress string
 }
 
-var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
+var ignoredColumns = map[string]bool{"stats_reset": true}
 
 var sampleConfig = `
   ## specify address via a url matching:
@@ -71,7 +68,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
 		p.Address = localhost
 	}
 
-	db, err := sql.Open("postgres", p.Address)
+	db, err := Connect(p.Address)
 	if err != nil {
 		return err
 	}
@@ -149,7 +146,7 @@ var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?")
 func (p *Postgresql) SanitizedAddress() (_ string, err error) {
 	var canonicalizedAddress string
 	if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
-		canonicalizedAddress, err = pq.ParseURL(p.Address)
+		canonicalizedAddress, err = ParseURL(p.Address)
 		if err != nil {
 			return p.sanitizedAddress, err
 		}
@@ -185,10 +182,7 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
 	}
 	if columnMap["datname"] != nil {
 		// extract the database name from the column map
-		dbnameChars := (*columnMap["datname"]).([]uint8)
-		for i := 0; i < len(dbnameChars); i++ {
-			dbname.WriteString(string(dbnameChars[i]))
-		}
+		dbname.WriteString((*columnMap["datname"]).(string))
 	} else {
 		dbname.WriteString("postgres")
 	}
diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go
index 64926f61..a0690961 100644
--- a/plugins/inputs/postgresql/postgresql_test.go
+++ b/plugins/inputs/postgresql/postgresql_test.go
@@ -28,6 +28,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 	for _, col := range p.AllColumns {
 		availableColumns[col] = true
 	}
+
 	intMetrics := []string{
 		"xact_commit",
 		"xact_rollback",
@@ -42,7 +43,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		"temp_files",
 		"temp_bytes",
 		"deadlocks",
-		"numbackends",
 		"buffers_alloc",
 		"buffers_backend",
 		"buffers_backend_fsync",
@@ -53,9 +53,20 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		"maxwritten_clean",
 	}
 
+	int32Metrics := []string{
+		"numbackends",
+	}
+
 	floatMetrics := []string{
 		"blk_read_time",
 		"blk_write_time",
+		"checkpoint_write_time",
+		"checkpoint_sync_time",
+	}
+
+	stringMetrics := []string{
+		"datname",
+		"datid",
 	}
 
 	metricsCounted := 0
@@ -68,6 +79,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		}
 	}
 
+	for _, metric := range int32Metrics {
+		_, ok := availableColumns[metric]
+		if ok {
+			assert.True(t, acc.HasInt32Field("postgresql", metric))
+			metricsCounted++
+		}
+	}
+
 	for _, metric := range floatMetrics {
 		_, ok := availableColumns[metric]
 		if ok {
@@ -76,8 +95,16 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		}
 	}
 
+	for _, metric := range stringMetrics {
+		_, ok := availableColumns[metric]
+		if ok {
+			assert.True(t, acc.HasStringField("postgresql", metric))
+			metricsCounted++
+		}
+	}
+
 	assert.True(t, metricsCounted > 0)
-	//assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
+	assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
 }
 
 func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) {
diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go
index beb010fc..00729bf7 100644
--- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go
+++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go
@@ -2,7 +2,6 @@ package postgresql_extensible
 
 import (
 	"bytes"
-	"database/sql"
 	"fmt"
 	"log"
 	"regexp"
@@ -10,8 +9,7 @@ import (
 
 	"github.com/influxdata/telegraf"
 	"github.com/influxdata/telegraf/plugins/inputs"
-
-	"github.com/lib/pq"
+	"github.com/influxdata/telegraf/plugins/inputs/postgresql"
 )
 
 type Postgresql struct {
@@ -40,7 +38,7 @@ type query []struct {
 	Measurement string
 }
 
-var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
+var ignoredColumns = map[string]bool{"stats_reset": true}
 
 var sampleConfig = `
   ## specify address via a url matching:
@@ -126,7 +124,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
 		p.Address = localhost
 	}
 
-	db, err := sql.Open("postgres", p.Address)
+	db, err := postgresql.Connect(p.Address)
 	if err != nil {
 		return err
 	}
@@ -212,7 +210,7 @@ func (p *Postgresql) SanitizedAddress() (_ string, err error) {
 	}
 	var canonicalizedAddress string
 	if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
-		canonicalizedAddress, err = pq.ParseURL(p.Address)
+		canonicalizedAddress, err = postgresql.ParseURL(p.Address)
 		if err != nil {
 			return p.sanitizedAddress, err
 		}
@@ -248,10 +246,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
 	}
 	if columnMap["datname"] != nil {
 		// extract the database name from the column map
-		dbnameChars := (*columnMap["datname"]).([]uint8)
-		for i := 0; i < len(dbnameChars); i++ {
-			dbname.WriteString(string(dbnameChars[i]))
-		}
+		dbname.WriteString((*columnMap["datname"]).(string))
 	} else {
 		dbname.WriteString("postgres")
 	}
@@ -275,19 +270,23 @@ COLUMN:
 		if ignore || *val == nil {
 			continue
 		}
+
 		for _, tag := range p.AdditionalTags {
 			if col != tag {
 				continue
 			}
 			switch v := (*val).(type) {
+			case string:
+				tags[col] = v
 			case []byte:
 				tags[col] = string(v)
-			case int64:
+			case int64, int32, int:
 				tags[col] = fmt.Sprintf("%d", v)
+			default:
+				log.Println("failed to add additional tag", col)
 			}
 			continue COLUMN
 		}
-
 		if v, ok := (*val).([]byte); ok {
 			fields[col] = string(v)
 		} else {
diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go
index 7fd90710..f92284ee 100644
--- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go
+++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go
@@ -33,6 +33,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 	for _, col := range p.AllColumns {
 		availableColumns[col] = true
 	}
+
 	intMetrics := []string{
 		"xact_commit",
 		"xact_rollback",
@@ -47,6 +48,9 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		"temp_files",
 		"temp_bytes",
 		"deadlocks",
+	}
+
+	int32Metrics := []string{
 		"numbackends",
 	}
 
@@ -55,6 +59,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		"blk_write_time",
 	}
 
+	stringMetrics := []string{
+		"datname",
+		"datid",
+	}
+
 	metricsCounted := 0
 
 	for _, metric := range intMetrics {
@@ -65,6 +74,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		}
 	}
 
+	for _, metric := range int32Metrics {
+		_, ok := availableColumns[metric]
+		if ok {
+			assert.True(t, acc.HasInt32Field("postgresql", metric))
+			metricsCounted++
+		}
+	}
+
 	for _, metric := range floatMetrics {
 		_, ok := availableColumns[metric]
 		if ok {
@@ -73,6 +90,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
 		}
 	}
 
+	for _, metric := range stringMetrics {
+		_, ok := availableColumns[metric]
+		if ok {
+			assert.True(t, acc.HasStringField("postgresql", metric))
+			metricsCounted++
+		}
+	}
+
 	assert.True(t, metricsCounted > 0)
 	assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
 }
diff --git a/testutil/accumulator.go b/testutil/accumulator.go
index 2efee557..4f131ec8 100644
--- a/testutil/accumulator.go
+++ b/testutil/accumulator.go
@@ -221,7 +221,7 @@ func (a *Accumulator) AssertDoesNotContainMeasurement(t *testing.T, measurement
 	}
 }
 
-// HasIntValue returns true if the measurement has an Int value
+// HasIntField returns true if the measurement has an Int value
 func (a *Accumulator) HasIntField(measurement string, field string) bool {
 	a.Lock()
 	defer a.Unlock()
@@ -239,6 +239,42 @@ func (a *Accumulator) HasIntField(measurement string, field string) bool {
 	return false
 }
 
+// HasInt32Field returns true if the measurement has an Int value
+func (a *Accumulator) HasInt32Field(measurement string, field string) bool {
+	a.Lock()
+	defer a.Unlock()
+	for _, p := range a.Metrics {
+		if p.Measurement == measurement {
+			for fieldname, value := range p.Fields {
+				if fieldname == field {
+					_, ok := value.(int32)
+					return ok
+				}
+			}
+		}
+	}
+
+	return false
+}
+
+// HasStringField returns true if the measurement has an String value
+func (a *Accumulator) HasStringField(measurement string, field string) bool {
+	a.Lock()
+	defer a.Unlock()
+	for _, p := range a.Metrics {
+		if p.Measurement == measurement {
+			for fieldname, value := range p.Fields {
+				if fieldname == field {
+					_, ok := value.(string)
+					return ok
+				}
+			}
+		}
+	}
+
+	return false
+}
+
 // HasUIntValue returns true if the measurement has a UInt value
 func (a *Accumulator) HasUIntField(measurement string, field string) bool {
 	a.Lock()
-- 
GitLab