resifdatareporter.py 6.84 KB
Newer Older
Jonathan Schaeffer's avatar
pep8    
Jonathan Schaeffer committed
1
#!/bin/env python
2

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
3
import logging
4
import os
5
import sys
6
import io
7
import subprocess
8
import re
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
9
from time import gmtime, strftime
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
10
from datetime import datetime, date, timedelta
11
import yaml
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
12
import psycopg2
13
import click
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
14
from fdsnextender import FdsnExtender
15

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
16
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
17
18
logger = logging.getLogger(__name__)

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
19

20
def scan_volume(path):
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
21
22
23
24
    """
    Scanne un volume indiqué par son chemin (path).
    La fonction lance une commande "du -d4 path" et analyse chaque ligne renvoyée.
    Elle renvoie une liste de dictionnaires :
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
25
    [ {year: 2011, network: 'G', size: '100', station: 'STAT', channel: 'BHZ.D'}, ...]
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
26
27
28
29
30
    """
    data = []
    volume = os.path.realpath(path)+'/'
    logger.debug("Volume %s", volume)
    # TODO mettre le niveau de profondeur (2) en option
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
31
    starttime = datetime.now()
32
33
34
35
    proc = subprocess.Popen(["du", "--exclude", ".snapshot", "-b", "-d4", volume], stdout=subprocess.PIPE)
    for l in io.TextIOWrapper(proc.stdout, encoding='utf-8'):
        l = l.strip()
        logger.debug("Scanned %s",l)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
36
37
38
39
40
        (size, path) = l.split('\t')
        # On ne garde que le chemin qui nous intéresse
        path = path.replace(volume, '').split('/')
        # Ne pas considérer le seul chemin de niveau 1
        if len(path) == 4:
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
41
            logger.debug("path: %s, size: %s", path, size)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
42
            (channel, quality) = path[3].split('.')
43
44
45
46
47
48
            if re.match('[2-9][0-9]{3}', path[0]):
                data.append({'year': path[0], 'network': path[1], 'station': path[2],
                             'channel': channel, 'quality': quality, 'size': size})
            else:
                data.append({'year': path[1], 'network': path[0], 'station': path[2],
                             'channel': channel, 'quality': quality, 'size': size})
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
49
            logger.debug(data[-1])
50
    logger.debug("Volume scanned in %s", datetime.now() - starttime)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
51
    return data
52
53
54


def scan_volumes(volumes):
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
55
56
57
58
59
60
    # volumes is a complex data type :
    # List of dictionaries of 2 elements (path and type)
    # [{path: /bla/bla, type: plop}, {path: /bli/bli, type: plip}]
    # En sortie, une liste de dictionnaires :
    # [ {stat}, {stat}, ]
    volume_stats = []
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
61
    starttime = datetime.now()
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
62
63
64
65
66
67
68
69
    for volume in volumes:
        logger.debug("Preparing scan of volume %s", volume['path'])
        if 'path' in volume:
            stats = scan_volume(volume['path'])
            # On rajoute le type comme un élément de chaque statistique
            if 'type' in volume:
                for s in stats:
                    s['type'] = volume['type']
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
70
71
72
            if 'name' in volume:
                for s in stats:
                    s['volume'] = volume['name']
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
73
74
75
76
77
            volume_stats.append(stats)
            # If a type of data was specified, then we add this tag to the stats
        else:
            raise ValueError("Volume has no path key : %s" % (volume))
    # on applati la liste de listes :
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
78
    logger.info("All volumes scanned in %s",
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
79
                 (datetime.now() - starttime))
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
80
    return [x for vol in volume_stats for x in vol]
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
81

82

83
@click.command()
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
84
85
86
@click.option('--config-file',  'configfile', type=click.File(), help='Configuration file path', envvar='CONFIG_FILE', show_default=True,
              default=f"{os.path.dirname(os.path.realpath(__file__))}/config.yml")
@click.option('--force-scan', flag_value=True, default=False, help='Force scanning of the archive')
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
87
88
89
@click.option('--dryrun', flag_value=True, default=False, help="Do not send metrics to database")
@click.option("--verbose", flag_value=True, default=False, help="Verbose mode")
def cli(configfile, force_scan, dryrun, verbose):
Jonathan Schaeffer's avatar
pep8    
Jonathan Schaeffer committed
90
91
92
    """
    Command line interface. Stands as main
    """
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
93
94
95
    if verbose:
        logger.setLevel(logging.DEBUG)
    logger.info("Starting")
96
    try:
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
97
        cfg = yaml.load(configfile, Loader=yaml.SafeLoader)
Jonathan Schaeffer's avatar
pep8    
Jonathan Schaeffer committed
98
    except:
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
99
        print(f"Error reading file {configfile}")
100
101
102

    # At this point we ensure that configuration is sane.

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
103
    statistics = []
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    today = date.today().strftime("%Y-%m-%d")

    if not force_scan:
        # Get last stat date
        conn = psycopg2.connect(dbname=cfg['postgres']['database'], user=cfg['postgres']['user'],
                                host=cfg['postgres']['host'], password=cfg['postgres']['password'], port=cfg['postgres']['port'])
        cur = conn.cursor()
        cur.execute('select distinct date from dataholdings order by date desc limit 1;')
        last_stat_date = cur.fetchone()[0]
        conn.close()
        if date.today() - last_stat_date > timedelta(days=(cfg['cache_ttl'])):
            logger.info("Cache is old, let's scan volumes")
        else:
            logger.info(
                "Last data report made at %s. Younger than %s. Don't scan",
                last_stat_date, cfg['cache_ttl'])
            sys.exit(0)
121
122

    statistics = scan_volumes(cfg['volumes'])
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
123

124
    # add the network_type (is the network permanent or not) to the statistic
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
125
126
    # also insert the extended network code.
    extender = FdsnExtender()
127
    for stat in statistics:
128
        if re.match('^[1-9XYZ]', stat['network']):
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
129
130
131
132
133
            stat['is_permanent'] = True
            try:
                stat['network'] = extender.extend(
                    stat['network'], int(stat['year']))
            except ValueError:
134
                logger.debug("Network %s exists ?" % stat['network'])
135
136
        else:
            stat['is_permanent'] = False
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
137
138
        stat['date'] = today
        logger.debug(stat)
139

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
140
    # Open dump file and write the stats.
141
142
143
144
145
146
147
148
    try:
        with open(os.path.split(configfile.name)[0]+"/data.yaml", 'w') as outfile:
            yaml.dump({'date': today,
                       'volumes': cfg['volumes'],
                       },
                      outfile, default_flow_style=False)
    except:
        logger.error("Error writing data to cache")
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
149

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
150
151
152
    if dryrun:
        logger.info("Dryrun mode, exit")
        sys.exit(0)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
153
        # Write to postgres database
154
155
156
157
158
159
160
161
    if 'postgres' in cfg:
        logger.info('Writing to postgres database')
        conn = psycopg2.connect(dbname=cfg['postgres']['database'], user=cfg['postgres']['user'],
                                host=cfg['postgres']['host'], password=cfg['postgres']['password'], port=cfg['postgres']['port'])
        cur = conn.cursor()
        for stat in statistics:
            cur.execute(
                """
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
162
163
                INSERT INTO dataholdings (network, year, station, channel, quality, type, size, is_permanent, volume, date)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
164
165
                ON CONFLICT (network,year,station,channel,type,date) DO UPDATE SET size = EXCLUDED.size;
                """,
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
166
                (stat['network'], stat['year'], stat['station'], stat['channel'], stat['quality'], stat['type'], stat['size'], stat['is_permanent'], stat['volume'], stat['date']))
167
168
        conn.commit()

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
169
170
if __name__ == "__main__":
    cli()