output.py 7.96 KB
Newer Older
Jerome Touvier's avatar
Jerome Touvier committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import logging
import os
import re
import time
from datetime import datetime

import psycopg2
from flask import make_response

from apps.globals import Error
from apps.utils import error_request
from apps.utils import tictac


def is_like_or_equal(params, key):
    """ Builds the condition for the specified key in the "where" clause taking into account lists or wildcards. """

    subquery = list()
    for param in params[key].split(","):
        op = "LIKE" if re.search(r"[*?]", param) else "="
        subquery.append(f"{key} {op} '{param}'")
    return " OR ".join(subquery)


def get_table(media):
    if media == "seedlink":
        return "ringserver_events"
    return "dataselectvol"


def sql_common_string(params, begin="", date_s="date"):
    s = begin
    # network, station, location, channel parameters
    s = f"""{s} WHERE ({is_like_or_equal(params, "network")})"""
    if params["station"] != "*":
        s = f"""{s} AND ({is_like_or_equal(params, "station")})"""
    if params["location"] != "*" and params["request"] != "storage":
        s = f"""{s} AND ({is_like_or_equal(params, "location")})"""
    if params["channel"] != "*":
        s = f"""{s} AND ({is_like_or_equal(params, "channel")})"""

    # starttime, endtime parameters
    if params["start"]:
        start = datetime.strftime(params["start"], "%Y-%m-%d")
        s = f"""{s} AND {date_s} >= '{start}'"""
    if params["end"]:
        end = datetime.strftime(params["end"], "%Y-%m-%d")
        s = f"""{s} AND {date_s} <= '{end}'"""
    return s


def sql_request(params):
    """ Builds the PostgreSQL request """

    if params["request"] == "storage":
        columns = "year, network, station, channel, quality, type"
        s = f"SELECT DISTINCT ON ({columns}) size, {columns}, date FROM dataholdings"
        s = sql_common_string(params, s)
59
        s = f"{s} AND channel is not NULL"
Jerome Touvier's avatar
Jerome Touvier committed
60
61
62
63
64
65
66
67
68
69
70

        if params["type"] != "all":
            s = f"""{s} AND (type = '{params["type"]}')"""
        if params["year"]:
            s = f"""{s} AND ({is_like_or_equal(params, "year")})"""
        s = f"""{s} ORDER BY {columns}, date DESC"""
        return s.replace("?", "_").replace("*", "%")
    else:
        select = list()
        for media in params["media"]:
            date_s = "time" if media == "seedlink" else "date"
Jerome Touvier's avatar
Jerome Touvier committed
71
            if params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
72
73
74
                s = f"SELECT country, count({date_s}) FROM {get_table(media)}"
            if params["request"] == "send":
                s = f"SELECT sum(bytes) FROM {get_table(media)}"
Jerome Touvier's avatar
Jerome Touvier committed
75
76
                if params["timeseries"]:
                    s = f"SELECT date, network, station, location, channel, bytes, country FROM {get_table(media)}"
Jerome Touvier's avatar
Jerome Touvier committed
77
78
            s = sql_common_string(params, s, date_s)

Jerome Touvier's avatar
Jerome Touvier committed
79
            if params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
80
81
82
83
                s = f"""{s} AND ({is_like_or_equal(params, "country")}) GROUP BY country"""
            if params["request"] == "send":
                s = f"""{s} AND ({is_like_or_equal(params, "country")})"""

84
85
86
            #            if params["granularity"]:
            #                granularity = "52w" if params["granularity"] == "year" else "4w"
            #                s = f"""{s} , time({granularity})"""
Jerome Touvier's avatar
Jerome Touvier committed
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
            select.append(s)
        select = " UNION ".join(select)
        return select.replace("?", "_").replace("*", "%")


def collect_data(params):
    """ Connect to the PostgreSQL RESIF database """

    tic = time.time()
    logging.debug("Start collecting data...")
    with psycopg2.connect(os.getenv("PG_DBURI")) as conn:
        logging.debug(conn.get_dsn_parameters())
        logging.debug(f"Postgres version : {conn.server_version}")
        with conn.cursor() as curs:
            SQL_SELECT = sql_request(params)
            curs.execute(SQL_SELECT)
            logging.debug(f"{SQL_SELECT}")
            logging.debug(curs.statusmessage)
Jerome Touvier's avatar
Jerome Touvier committed
105
            return curs.fetchall()
Jerome Touvier's avatar
Jerome Touvier committed
106
107
108
109
110
111
            logging.debug(f"Get data in {tictac(tic)} seconds.")


def sum_results(params, data):
    """ Adds the results from the different media tables. """
    newdata = list()
Jerome Touvier's avatar
Jerome Touvier committed
112
    if params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
113
114
        result = dict()
        for row in data:
Jerome Touvier's avatar
Jerome Touvier committed
115
116
117
118
119
            if row[1]:
                if row[0] in result:
                    result[row[0]] = result[row[0]] + int(row[1])
                else:
                    result[row[0]] = int(row[1])
Jerome Touvier's avatar
Jerome Touvier committed
120
121
122
123
124
        for key, val in result.items():
            newdata.append([key, str(val)])
    elif params["request"] == "send":
        result = 0
        for row in data:
Jerome Touvier's avatar
Jerome Touvier committed
125
126
            if row[0]:
                result = result + int(row[0])
Jerome Touvier's avatar
Jerome Touvier committed
127
128
129
130
131
132
133
134
        newdata.append([str(result)])
    return newdata


def get_header(params):
    if params["request"] == "storage":
        header = ["size", "year", "network", "station", "channel", "quality", "type"]
        header.append("lastupdated")
Jerome Touvier's avatar
Jerome Touvier committed
135
    elif params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
136
137
138
139
140
141
142
        header = ["country", "requests"]
    elif params["request"] == "send":
        if len(params["media"]) == 1 and "all" not in params["media"]:
            if "seedlink" in params["media"]:
                header = ["SEEDLINK (in bytes)"]
            elif "dataselect" in params["media"]:
                header = ["DATASELECT (in bytes)"]
Jerome Touvier's avatar
Jerome Touvier committed
143
144
145
146
147
148
149
150
151
152
        elif params["timeseries"]:
            header = [
                "date",
                "network",
                "station",
                "location",
                "channel",
                "bytes",
                "country",
            ]
Jerome Touvier's avatar
Jerome Touvier committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
        else:
            header = ["SEEDLINK and DATASELECT (in bytes)"]
    return header


def get_column_widths(data, header=None):
    """ Find the maximum width of each column"""
    ncols = range(len(data[0]))
    colwidths = [max([len(r[i]) for r in data]) for i in ncols]
    if header:
        colwidths = [max(len(h), cw) for h, cw in zip(header, colwidths)]
    return colwidths


def records_to_text(params, data, sep=" "):
    text = ""
    header = get_header(params)
    if params["format"] == "text":
        sizes = get_column_widths(data, header)
        # pad header and rows according to the maximum column width
        header = [val.ljust(sz) for val, sz in zip(header, sizes)]
        for row in data:
            row[:] = [val.ljust(sz) for val, sz in zip(row, sizes)]

    if params["format"] != "request":
        text = sep.join(header) + "\n"

    data = [f"{sep.join(row)}\n" for row in data]
    text += "".join(data)
    return text


def get_response(params, data):
    tic = time.time()
Jerome Touvier's avatar
Jerome Touvier committed
187
    data = [[str(val) for val in row] for row in data]
Jerome Touvier's avatar
Jerome Touvier committed
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
    fname = "resifws-statistics"
    headers = {"Content-type": "text/plain"}
    if params["format"] == "text":
        response = make_response(records_to_text(params, data), headers)
    elif params["format"] == "request":
        response = make_response(records_to_text(params, data), headers)
    elif params["format"] == "sync":
        response = make_response(records_to_text(params, data, "|"), headers)
    elif params["format"] == "csv":
        headers = {"Content-Disposition": f"attachment; filename={fname}.csv"}
        response = make_response(records_to_text(params, data, ","), headers)
        response.headers["Content-type"] = "text/csv"
    logging.debug(f"Response built in {tictac(tic)} seconds.")
    return response


def get_statistics(params):
    """Statistics output (csv, request, sync, text)
    :params: parameters
    :returns: text or csv with data statistics"""

    try:
        tic = time.time()
        data = collect_data(params)

        if data is None:
            return data
        if not data:
            code = params["nodata"]
            return error_request(msg=f"HTTP._{code}_", details=Error.NODATA, code=code)

        nrows = len(data)
        logging.debug(f"Number of collected rows: {nrows}")

Jerome Touvier's avatar
Jerome Touvier committed
222
        if params["request"] != "storage" and not params["timeseries"]:
Jerome Touvier's avatar
Jerome Touvier committed
223
224
225
226
227
228
229
            data = sum_results(params, data)

        return get_response(params, data)
    except Exception as ex:
        logging.exception(str(ex))
    finally:
        logging.info(f"Data processed in {tictac(tic)} seconds.")