Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OSUG
RESIF
resif_data_reporter
Commits
0226e426
Commit
0226e426
authored
May 06, 2020
by
Jonathan Schaeffer
Browse files
Do not dump nor load statistics
parent
4df5ba36
Changes
1
Hide whitespace changes
Inline
Side-by-side
resifdatareporter/resifdatareporter.py
View file @
0226e426
...
...
@@ -2,13 +2,13 @@
import
logging.config
import
os
import
sys
import
subprocess
from
time
import
gmtime
,
strftime
import
yaml
from
influxdb
import
InfluxDBClient
import
datetime
import
psycopg2
from
psycopg2.extras
import
execute_values
import
click
from
fdsnextender
import
FdsnExtender
...
...
@@ -108,8 +108,6 @@ def cli(configfile):
logger
=
logging
.
getLogger
(
"resif_data_reporter"
)
logger
.
info
(
"Starting"
)
statistics
=
[]
use_cache
=
False
# Refresh or use cache ?
# Try to open data.yaml
cache_file
=
cfg
[
'cache_file'
]
...
...
@@ -126,16 +124,17 @@ def cli(configfile):
# Compute cache age
if
datetime
.
date
.
today
()
-
previous_run_date
>
datetime
.
timedelta
(
days
=
(
cfg
[
'cache_ttl'
])):
logger
.
info
(
"Cache is old, let's scan volumes"
)
statistics
=
scan_volumes
(
cfg
[
'volumes'
])
else
:
logger
.
info
(
"Cache is available, let's be lazy for this time and use it"
)
use_cache
=
True
statistics
=
cache
[
'statistics'
]
"Last data report made at %s. Younger than %s. We don not scan"
,
previous_run_date
,
cfg
[
'cache_ttl'
])
sys
.
exit
(
0
)
except
FileNotFoundError
:
logger
.
debug
(
"Cache file %s not found, let's scan volumes."
%
cfg
[
'cache_file'
])
statistics
=
scan_volumes
(
cfg
[
'volumes'
])
statistics
=
scan_volumes
(
cfg
[
'volumes'
])
logger
.
debug
(
statistics
)
today
=
datetime
.
date
.
today
().
strftime
(
"%Y-%m-%d"
)
...
...
@@ -151,82 +150,81 @@ def cli(configfile):
stat
[
'network'
]
=
extender
.
extend
(
stat
[
'network'
],
int
(
stat
[
'year'
]))
except
ValueError
:
logger
.
warnin
g
(
"Network %s exists ?"
%
stat
[
'network'
])
logger
.
debu
g
(
"Network %s exists ?"
%
stat
[
'network'
])
stat
[
'date'
]
=
today
logger
.
debug
(
stat
)
# Open dump file and write the stats.
if
use_cache
==
False
:
try
:
with
open
(
os
.
path
.
split
(
configfile
.
name
)[
0
]
+
"/data.yaml"
,
'w'
)
as
outfile
:
yaml
.
dump
({
'date'
:
today
,
'volumes'
:
cfg
[
'volumes'
],
'statistics'
:
statistics
},
outfile
,
default_flow_style
=
False
)
except
:
logger
.
error
(
"Error writing data to cache"
)
try
:
with
open
(
os
.
path
.
split
(
configfile
.
name
)[
0
]
+
"/data.yaml"
,
'w'
)
as
outfile
:
yaml
.
dump
({
'date'
:
today
,
'volumes'
:
cfg
[
'volumes'
],
},
outfile
,
default_flow_style
=
False
)
except
:
logger
.
error
(
"Error writing data to cache"
)
# Write to postgres database
if
'postgres'
in
cfg
:
logger
.
info
(
'Writing to postgres database'
)
conn
=
psycopg2
.
connect
(
dbname
=
cfg
[
'postgres'
][
'database'
],
user
=
cfg
[
'postgres'
][
'user'
],
host
=
cfg
[
'postgres'
][
'host'
],
password
=
cfg
[
'postgres'
][
'password'
],
port
=
cfg
[
'postgres'
][
'port'
])
cur
=
conn
.
cursor
()
execute_values
(
cur
,
"""
INSERT INTO dataholdings (network, year, station, channel, quality, type, size, is_permanent, date) VALUES %s
ON CONFLICT DO UPDATE SET size = EXCLUDED.size;
"""
,
statistics
,
"(%(network)s, %(year)s, %(station)s, %(channel)s, %(quality)s, %(type)s, %(size)s, %(is_permanent)s, %(date)s)"
)
conn
.
commit
()
if
'influxdb'
in
cfg
:
logger
.
info
(
'Writing in influxdb'
)
influxdb_json_data
=
[]
# Compose json data
record_time
=
strftime
(
"%Y-%m-%dT%H:%M:%SZ"
,
gmtime
())
for
stat
in
statistics
:
influxdb_json_data
.
append
(
{
"measurement"
:
cfg
[
'influxdb'
][
'measurement'
],
"tags"
:
{
"year"
:
int
(
stat
[
'year'
]),
"network"
:
stat
[
'network'
],
"station"
:
stat
[
'station'
],
"channel"
:
stat
[
'channel'
],
"quality"
:
stat
[
'quality'
],
"permanent"
:
bool
(
stat
[
'is_permanent'
]),
"type"
:
stat
[
'type'
],
"date"
:
stat
[
'date'
]
},
"time"
:
record_time
,
"fields"
:
{
"size"
:
int
(
stat
[
'size'
])
}
}
)
logger
.
info
(
influxdb_json_data
)
# Now, send this data to influxdb
try
:
logger
.
info
(
"Sending data to influxdb"
)
logger
.
debug
(
"host = %s"
,
cfg
[
'influxdb'
][
'server'
])
logger
.
debug
(
"port = %s"
,
str
(
cfg
[
'influxdb'
][
'port'
]))
logger
.
debug
(
"database = %s"
,
cfg
[
'influxdb'
][
'database'
])
logger
.
debug
(
"username = %s"
,
cfg
[
'influxdb'
][
'user'
])
client
=
InfluxDBClient
(
host
=
cfg
[
'influxdb'
][
'server'
],
port
=
cfg
[
'influxdb'
][
'port'
],
database
=
cfg
[
'influxdb'
][
'database'
],
username
=
cfg
[
'influxdb'
][
'user'
],
password
=
cfg
[
'influxdb'
][
'password'
],
ssl
=
cfg
[
'influxdb'
][
'ssl'
],
verify_ssl
=
cfg
[
'influxdb'
][
'verify_ssl'
]
)
client
.
write_points
(
influxdb_json_data
)
except
Exception
as
e
:
logger
.
error
(
"Unexpected error writing data to influxdb"
)
logger
.
error
(
e
)
if
'postgres'
in
cfg
:
logger
.
info
(
'Writing to postgres database'
)
conn
=
psycopg2
.
connect
(
dbname
=
cfg
[
'postgres'
][
'database'
],
user
=
cfg
[
'postgres'
][
'user'
],
host
=
cfg
[
'postgres'
][
'host'
],
password
=
cfg
[
'postgres'
][
'password'
],
port
=
cfg
[
'postgres'
][
'port'
])
cur
=
conn
.
cursor
()
for
stat
in
statistics
:
cur
.
execute
(
"""
INSERT INTO dataholdings (network, year, station, channel, quality, type, size, is_permanent, date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (network,year,station,channel,type,date) DO UPDATE SET size = EXCLUDED.size;
"""
,
(
stat
[
'network'
],
stat
[
'year'
],
stat
[
'station'
],
stat
[
'channel'
],
stat
[
'quality'
],
stat
[
'type'
],
stat
[
'size'
],
stat
[
'is_permanent'
],
stat
[
'date'
]))
conn
.
commit
()
if
'influxdb'
in
cfg
:
logger
.
info
(
'Writing in influxdb'
)
influxdb_json_data
=
[]
# Compose json data
record_time
=
strftime
(
"%Y-%m-%dT%H:%M:%SZ"
,
gmtime
())
for
stat
in
statistics
:
influxdb_json_data
.
append
(
{
"measurement"
:
cfg
[
'influxdb'
][
'measurement'
],
"tags"
:
{
"year"
:
int
(
stat
[
'year'
]),
"network"
:
stat
[
'network'
],
"station"
:
stat
[
'station'
],
"channel"
:
stat
[
'channel'
],
"quality"
:
stat
[
'quality'
],
"permanent"
:
bool
(
stat
[
'is_permanent'
]),
"type"
:
stat
[
'type'
],
"date"
:
stat
[
'date'
]
},
"time"
:
record_time
,
"fields"
:
{
"size"
:
int
(
stat
[
'size'
])
}
}
)
logger
.
info
(
influxdb_json_data
)
# Now, send this data to influxdb
try
:
logger
.
info
(
"Sending data to influxdb"
)
logger
.
debug
(
"host = %s"
,
cfg
[
'influxdb'
][
'server'
])
logger
.
debug
(
"port = %s"
,
str
(
cfg
[
'influxdb'
][
'port'
]))
logger
.
debug
(
"database = %s"
,
cfg
[
'influxdb'
][
'database'
])
logger
.
debug
(
"username = %s"
,
cfg
[
'influxdb'
][
'user'
])
client
=
InfluxDBClient
(
host
=
cfg
[
'influxdb'
][
'server'
],
port
=
cfg
[
'influxdb'
][
'port'
],
database
=
cfg
[
'influxdb'
][
'database'
],
username
=
cfg
[
'influxdb'
][
'user'
],
password
=
cfg
[
'influxdb'
][
'password'
],
ssl
=
cfg
[
'influxdb'
][
'ssl'
],
verify_ssl
=
cfg
[
'influxdb'
][
'verify_ssl'
]
)
client
.
write_points
(
influxdb_json_data
)
except
Exception
as
e
:
logger
.
error
(
"Unexpected error writing data to influxdb"
)
logger
.
error
(
e
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment