Commit 037e997b authored by Jonathan Schaeffer's avatar Jonathan Schaeffer
Browse files

Write to postgres OK

parent 055bd4a0
......@@ -8,4 +8,4 @@ PyYAML==3.13
requests==2.20.1
six==1.11.0
urllib3==1.24.1
psycopg2-binary==2.7.6.1
......@@ -10,7 +10,8 @@ import yaml
from influxdb import InfluxDBClient
from pprint import pprint, pformat
import datetime
import psycopg2
from psycopg2.extras import execute_values
logger = logging.getLogger(__name__)
......@@ -57,17 +58,22 @@ def scan_volumes(volumes):
# volumes is a complex data type :
# List of dictionaries of 2 elements (path and type)
# [{path: /bla/bla, type: plop}, {path: /bli/bli, type: plip}]
volume_stats = {}
# En sortie, une liste de dictionnaires :
# [ {stat}, {stat}, ]
volume_stats = []
for volume in volumes:
if 'path' in volume:
volume_stats = scan_volume(volume['path'])
# If a type of data was specified, then we add this tag to the stats
stats = scan_volume(volume['path'])
# On rajoute le type comme un élément de chaque statistique
if 'type' in volume:
for vs in volume_stats:
vs['type'] = volume['type']
for s in stats:
s['type'] = volume['type']
volume_stats.append(stats)
# If a type of data was specified, then we add this tag to the stats
else:
raise ValueError("Volume has no path key : %s"%(volume))
return volume_stats
# on applati la liste de listes :
return [ x for vol in volume_stats for x in vol ]
def main():
......@@ -92,7 +98,8 @@ def main():
logging.config.fileConfig(cfg["logger_file"])
logger = logging.getLogger("resif_data_reporter")
logger.info("Starting")
data_by_type = {}
statistics = []
use_cache = False
# Refresh or use cache ?
# Try to open data.yaml
......@@ -101,88 +108,64 @@ def main():
if not cache_file.startswith('/'):
cache_file = os.path.dirname(os.path.realpath(args.config.name))+'/'+cache_file
with open(cache_file, 'r') as ymlfile:
data_by_type = yaml.load(ymlfile)
cache = yaml.load(ymlfile)
# Compare volumes in cfg and in cache
if set(cfg['volumes']) == set(data_by_type['volumes']):
if cfg['volumes'] == cache['volumes']:
# Get previous run data
previous_run_date = datetime.datetime.strptime(data_by_type['date'], "%Y-%m-%d").date()
previous_run_date = datetime.datetime.strptime(cache['date'], "%Y-%m-%d").date()
# Compute cache age
if datetime.date.today() - previous_run_date > datetime.timedelta(days=(cfg['cache_ttl'])):
logger.info("Cache is old, let's scan volumes")
data_by_type = scan_volumes(cfg['volumes'])
statistics = scan_volumes(cfg['volumes'])
else:
logger.info("Cache is available, let's be lazy for this time and use it")
del data_by_type["date"]
del data_by_type["volumes"]
use_cache = True
statistics = cache['statistics']
except FileNotFoundError:
logger.debug("Cache file %s not found, let's scan volumes."%cfg['cache_file'])
data_by_type = scan_volumes(cfg['volumes'])
statistics = scan_volumes(cfg['volumes'])
logger.debug(statistics)
today = datetime.date.today().strftime("%Y-%m-%d")
# Open dump file and write the stats.
if use_cache == False:
try:
with open(os.path.dirname(os.path.realpath(__file__))+"/data.yaml", 'w') as outfile:
yaml.dump({'date': today,
'volumes': cfg['volumes'],
'statistics': statistics
},
outfile, default_flow_style=False)
except:
logger.error("Error writing data to cache : "+sys.exc_info()[0])
# add the network_type (is the network permanent or not) to the statistic
for stat in statistics:
is_permanent = False
if stat['network'] in cfg['metadata']['permanent_networks']:
is_permanent = True
stat['is_permanent'] = is_permanent
# TODO: à revoir, la ligne suivante, c'est la honte
stat['date'] = today
logger.debug(stat)
# Write to postgres database
conn = psycopg2.connect(dbname=cfg['postgres']['database'], user=cfg['postgres']['user'], host=cfg['postgres']['host'], password=cfg['postgres']['password'], port=cfg['postgres']['port'])
cur = conn.cursor()
execute_values(cur,
"""INSERT INTO datastats (network, year, type, size, is_permanent, date) VALUES %s""",
statistics,
"(%(network)s, %(year)s, %(type)s, %(size)s, %(is_permanent)s, %(date)s)")
logger.debug("Coucou, terminé")
conn.commit()
conn.close()
conn = psycopg2.connect(dbname=cfg['postgres']['database'], user=cfg['postgres']['user'], host=cfg['postgres']['host'], password=cfg['postgres']['password'], port=cfg['postgres']['port'])
cur = conn.cursor()
cur.execute("select * from datastats")
logger.debug(cur.fetchall())
influxdb_json_data = []
logger.info(pformat(data_by_type))
# Open dump file
try:
with open(os.path.dirname(os.path.realpath(__file__))+"/data.yaml", 'w') as outfile:
yaml.dump({'date': datetime.datetime.now().strftime("%Y-%m-%d"),
'volumes': cfg['volumes'],
**data_by_type},
outfile, default_flow_style=False)
except:
logger.error("Error writing data to cache : "+sys.exc_info()[0])
influxdb_json_data = []
# Compose json data
# data_by_type = {'bud': { 2011: {G: 23, FR: 100, ...}, 2012: {G: 12, FR: 120, ...}, ...},
# 'validated': {2011: {G: 23, FR: 100, ...}, 2012: {G: 12, FR: 120, ...}, ...}
# }
for datatype in data_by_type:
for year in data_by_type[datatype]:
for n, value in data[year].items():
network_type = 'temporary'
if n in cfg['metadata']['permanent_networks']:
network_type = 'permanent'
influxdb_json_data.append(
{"measurement": cfg['influxdb']['measurement'],
"tags": {
"year": year,
"network": n,
"network_type": network_type,
"date": datetime.date.today().strftime("%Y-%m-%d"),
"type": datatype
},
"time": strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()),
"fields": {
"size": value
}
}
)
logger.info(pformat(influxdb_json_data))
# Now, send this data to influxdb
try:
logger.info("Sending data to influxdb")
logger.debug("host = "+cfg['influxdb']['server'])
logger.debug("port = "+str(cfg['influxdb']['port']))
logger.debug("database = "+cfg['influxdb']['database'])
logger.debug("username = "+cfg['influxdb']['user'])
client = InfluxDBClient(host = cfg['influxdb']['server'],
port = cfg['influxdb']['port'],
database = cfg['influxdb']['database'],
username = cfg['influxdb']['user'],
password = cfg['influxdb']['password'],
ssl = cfg['influxdb']['ssl'],
verify_ssl = cfg['influxdb']['verify_ssl']
)
client.write_points(influxdb_json_data)
except Exception as e:
logger.error("Unexpected error writing data to influxdb")
logger.error(e)
if __name__ == "__main__":
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment