resifdatareporter.py 10.1 KB
Newer Older
Jonathan Schaeffer's avatar
pep8    
Jonathan Schaeffer committed
1
#!/bin/env python
2

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
3
import logging
4
import os
5
import sys
6
import io
7
import subprocess
8
import re
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
9
from datetime import datetime, date, timedelta
10
import yaml
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
11
import psycopg2
12
import click
13
import h5py
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
14
from fdsnextender import FdsnExtender
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
15
from resifdatareporter import __version__
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
16
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
17
18
logger = logging.getLogger(__name__)

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
19

20
def scan_volume(path):
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
21
22
23
24
    """
    Scanne un volume indiqué par son chemin (path).
    La fonction lance une commande "du -d4 path" et analyse chaque ligne renvoyée.
    Elle renvoie une liste de dictionnaires :
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
25
    [ {year: 2011, network: 'G', size: '100', station: 'STAT', channel: 'BHZ.D'}, ...]
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
26
27
28
29
    """
    data = []
    volume = os.path.realpath(path)+'/'
    logger.debug("Volume %s", volume)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
30
    starttime = datetime.now()
31
32
33
34
    proc = subprocess.Popen(["du", "--exclude", ".snapshot", "-b", "-d4", volume], stdout=subprocess.PIPE)
    for l in io.TextIOWrapper(proc.stdout, encoding='utf-8'):
        l = l.strip()
        logger.debug("Scanned %s",l)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
35
36
37
38
39
        (size, path) = l.split('\t')
        # On ne garde que le chemin qui nous intéresse
        path = path.replace(volume, '').split('/')
        # Ne pas considérer le seul chemin de niveau 1
        if len(path) == 4:
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
40
            logger.debug("path: %s, size: %s", path, size)
41
42
            try:
                (channel, quality) = path[3].split('.')
43
44
45
            except ValueError:
                logger.info("%s is probably not a normal path. Skip it.", path)
                continue
46
            if re.match('[1-9][0-9]{3}', path[0]):
47
48
49
50
51
                data.append({'year': path[0], 'network': path[1], 'station': path[2],
                             'channel': channel, 'quality': quality, 'size': size})
            else:
                data.append({'year': path[1], 'network': path[0], 'station': path[2],
                             'channel': channel, 'quality': quality, 'size': size})
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
52
            logger.debug(data[-1])
53
    logger.debug("Volume scanned in %s", datetime.now() - starttime)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
54
    return data
55

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
56
def scan_ph5_volume(volpath):
57
58
59
60
61
    """
    Un repertoire contenant des données nodes doit être analysé différemment
    - a minima, un /du/ du répertoire et on stocke l'info seulement pour le réseau
    - sinon, en analysant les volumes ph5, mais je ne sais pas si on en a vraiment besoin.
    """
62
    data = []
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    stations = [""]
    volume = os.path.realpath(volpath)+'/'
    logger.debug("Volume %s", volume)
    starttime = datetime.now()
    proc = subprocess.Popen(["ls", volume], stdout=subprocess.PIPE)
    for l in io.TextIOWrapper(proc.stdout, encoding='utf-8'):
        network = l.strip()
        path = f"{volume}/{network}"
        logger.debug("Scanned %s", network)
        try:
            year = int(network[2:])
        except ValueError:
            # Bon, ça n'a pas marché, on fait quoi ?
            logger.error("Unable to get year from path %s. Ignoring this one", path)
            continue
        try:
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
79
            # Récupération de la liste des stations
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
80
81
82
83
            h5data = h5py.File(f"{path}/master.ph5",'r')
            logger.debug("Master PH5 stations: %s",
                    h5data['Experiment_g']['Maps_g'])
            stations = [sta[6:] for sta in list(h5data['Experiment_g']['Maps_g']) if sta.startswith('Das_g_')]
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
84
            logger.debug("%s stations in %s", len(stations), network)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
85
86
        except Exception as err:
            logger.error("No master.ph5 file in %s. Let's assume there is one station", path)
87

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
88
89
90
91
92
        total = 0
        for dirpath, dirnames, filenames in os.walk(path):
            for i in filenames:
                logger.debug("Scanning %s: file %s", network, i)
                if i.endswith('ph5'):
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
93
94
                    total = total + os.path.getsize(f"{path}/{i}")
        logger.debug("Total size of %s is %s (%s GB)", network, total, total/(1024**3) )
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
95
96
97
98
99
        # Make a statistic array with those stations dividing total size on each station.
        per_station_size = int(total / len(stations))
        for sta in stations:
            data.append({'type': 'ph5_validated', 'year': year, 'network': network, 'station': sta,
                         'channel': None, 'quality': None, 'size': per_station_size})
100
101
102
    return data


103
def scan_volumes(volumes):
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
104
105
106
107
108
109
    # volumes is a complex data type :
    # List of dictionaries of 2 elements (path and type)
    # [{path: /bla/bla, type: plop}, {path: /bli/bli, type: plip}]
    # En sortie, une liste de dictionnaires :
    # [ {stat}, {stat}, ]
    volume_stats = []
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
110
    starttime = datetime.now()
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
111
112
113
    for volume in volumes:
        logger.debug("Preparing scan of volume %s", volume['path'])
        if 'path' in volume:
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
114
115
            if 'type' in volume and volume['type'] == "ph5":
                stats = scan_ph5_volume(volume['path'])
116
117
118
119
120
121
            else:
                stats = scan_volume(volume['path'])
                # On rajoute le type comme un élément de chaque statistique
                if 'type' in volume:
                    for s in stats:
                        s['type'] = volume['type']
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
122
123
124
            if 'name' in volume:
                for s in stats:
                    s['volume'] = volume['name']
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
125
126
127
128
129
            volume_stats.append(stats)
            # If a type of data was specified, then we add this tag to the stats
        else:
            raise ValueError("Volume has no path key : %s" % (volume))
    # on applati la liste de listes :
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
130
    logger.info("All volumes scanned in %s",
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
131
                 (datetime.now() - starttime))
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
132
    return [x for vol in volume_stats for x in vol]
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
133

134

135
@click.command()
136
@click.option("--version", flag_value=True, default=False, help="Print version and exit")
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
137
138
139
@click.option('--config-file',  'configfile', type=click.File(), help='Configuration file path', envvar='CONFIG_FILE', show_default=True,
              default=f"{os.path.dirname(os.path.realpath(__file__))}/config.yml")
@click.option('--force-scan', flag_value=True, default=False, help='Force scanning of the archive')
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
140
141
@click.option('--dryrun', flag_value=True, default=False, help="Do not send metrics to database")
@click.option("--verbose", flag_value=True, default=False, help="Verbose mode")
142
143
@click.version_option(__version__)
def cli(configfile, force_scan, dryrun, verbose, version):
Jonathan Schaeffer's avatar
pep8    
Jonathan Schaeffer committed
144
145
146
    """
    Command line interface. Stands as main
    """
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
147
148
149
    if verbose:
        logger.setLevel(logging.DEBUG)
    logger.info("Starting")
150
    try:
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
151
        cfg = yaml.load(configfile, Loader=yaml.SafeLoader)
152
153
154
155
    except yaml.YAMLError as err:
        logger.error("Could not parse %s", configfile)
        logger.error(err)
        sys.exit(1)
156
157

    # At this point we ensure that configuration is sane.
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
158
    statistics = []
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
159
160
161
162
163
164
165
166
167
    today = date.today().strftime("%Y-%m-%d")

    if not force_scan:
        # Get last stat date
        conn = psycopg2.connect(dbname=cfg['postgres']['database'], user=cfg['postgres']['user'],
                                host=cfg['postgres']['host'], password=cfg['postgres']['password'], port=cfg['postgres']['port'])
        cur = conn.cursor()
        cur.execute('select distinct date from dataholdings order by date desc limit 1;')
        last_stat_date = cur.fetchone()[0]
168
        logger.info("Last report: %s", last_stat_date)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
169
170
        conn.close()
        if date.today() - last_stat_date > timedelta(days=(cfg['cache_ttl'])):
171
            logger.info("Last report is old enough. Let's get the job done.")
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
172
173
174
175
176
        else:
            logger.info(
                "Last data report made at %s. Younger than %s. Don't scan",
                last_stat_date, cfg['cache_ttl'])
            sys.exit(0)
177
178

    statistics = scan_volumes(cfg['volumes'])
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
179

180
    # add the network_type (is the network permanent or not) to the statistic
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
181
182
    # also insert the extended network code.
    extender = FdsnExtender()
183
    for stat in statistics:
184
185
186
187
188
189
190
191
192
193
194
        # Les réseaux commençant par 1 à 9 et X Y Z sont des réseaux non
        # permanents
        stat['is_permanent'] = not re.match('^[1-9XYZ]', stat['network'])
        # Si le réseau temporaire est en 2 lettres, alors on veut son nom
        # étendu
        if not stat['is_permanent'] and len(stat['network']) < 3 :
            try:
                stat['network'] = extender.extend(
                    stat['network'], int(stat['year']))
            except ValueError:
                logger.debug("Network %s exists ?" % stat['network'])
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
195
196
        stat['date'] = today
        logger.debug(stat)
197

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
198
    if dryrun:
199
200
201
        logger.info("Dryrun mode, dump stats and exit")
        for stat in statistics:
            print(stat)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
202
        sys.exit(0)
Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
203
        # Write to postgres database
204
205
206
207
208
209
    if 'postgres' in cfg:
        logger.info('Writing to postgres database')
        conn = psycopg2.connect(dbname=cfg['postgres']['database'], user=cfg['postgres']['user'],
                                host=cfg['postgres']['host'], password=cfg['postgres']['password'], port=cfg['postgres']['port'])
        cur = conn.cursor()
        for stat in statistics:
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
            try:
                cur.execute(
                    """
                    INSERT INTO dataholdings (network, year, station, channel, quality, type, size, is_permanent, volume, date)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT (network,year,station,channel,type,date) DO UPDATE SET size = EXCLUDED.size;
                    """,
                    (stat['network'], stat['year'], stat['station'], stat['channel'], stat['quality'], stat['type'], stat['size'], stat['is_permanent'], stat['volume'], stat['date']))
            except psycopg2.Error as err:
                logging.error(err)
                logging.info(cur.mogrify(
                    """
                    INSERT INTO dataholdings (network, year, station, channel, quality, type, size, is_permanent, volume, date)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT (network,year,station,channel,type,date) DO UPDATE SET size = EXCLUDED.size;
                    """,
                    (stat['network'], stat['year'], stat['station'], stat['channel'], stat['quality'], stat['type'], stat['size'], stat['is_permanent'], stat['volume'], stat['date'])
                ))
228
229
        conn.commit()

Jonathan Schaeffer's avatar
Jonathan Schaeffer committed
230
231
if __name__ == "__main__":
    cli()