output.py 11.4 KB
Newer Older
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
1
import json
Jerome Touvier's avatar
Jerome Touvier committed
2
3
4
5
6
import logging
import re
import time
from datetime import datetime

Jerome Touvier's avatar
ad map    
Jerome Touvier committed
7
8
9
10
11
12
from bokeh.embed import file_html
from bokeh.models import ColorBar, LogColorMapper, LogTicker, NumeralTickFormatter
from bokeh.palettes import Plasma256, Viridis256
from bokeh.plotting import figure
from bokeh.resources import CDN

Jerome Touvier's avatar
Jerome Touvier committed
13
from flask import current_app, make_response
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
14
import psycopg2
Jerome Touvier's avatar
Jerome Touvier committed
15
16

from apps.globals import Error
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
17
from apps.isoalpha2 import codes
Jerome Touvier's avatar
Jerome Touvier committed
18
19
20
21
from apps.utils import error_request
from apps.utils import tictac


Jerome Touvier's avatar
update    
Jerome Touvier committed
22
23
def human_readable_size(size_in_bytes):
    index = 0
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
24
    size_units = ["B", "KB", "MB", "GB", "TB", "PB", "PB"]
Jerome Touvier's avatar
update    
Jerome Touvier committed
25
26
27
    while size_in_bytes >= 1024 and index < 6:
        size_in_bytes /= 1024
        index += 1
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
28
    return f"{round(size_in_bytes, 2)} {size_units[index]}"
Jerome Touvier's avatar
update    
Jerome Touvier committed
29
30


Jerome Touvier's avatar
Jerome Touvier committed
31
32
33
34
35
36
37
38
39
40
def is_like_or_equal(params, key):
    """ Builds the condition for the specified key in the "where" clause taking into account lists or wildcards. """

    subquery = list()
    for param in params[key].split(","):
        op = "LIKE" if re.search(r"[*?]", param) else "="
        subquery.append(f"{key} {op} '{param}'")
    return " OR ".join(subquery)


Jerome Touvier's avatar
Jerome Touvier committed
41
def sql_common_string(params, begin=""):
Jerome Touvier's avatar
Jerome Touvier committed
42
43
44
45
46
47
48
49
50
51
52
53
54
    s = begin
    # network, station, location, channel parameters
    s = f"""{s} WHERE ({is_like_or_equal(params, "network")})"""
    if params["station"] != "*":
        s = f"""{s} AND ({is_like_or_equal(params, "station")})"""
    if params["location"] != "*" and params["request"] != "storage":
        s = f"""{s} AND ({is_like_or_equal(params, "location")})"""
    if params["channel"] != "*":
        s = f"""{s} AND ({is_like_or_equal(params, "channel")})"""

    # starttime, endtime parameters
    if params["start"]:
        start = datetime.strftime(params["start"], "%Y-%m-%d")
Jerome Touvier's avatar
Jerome Touvier committed
55
        s = f"""{s} AND date >= '{start}'"""
Jerome Touvier's avatar
Jerome Touvier committed
56
57
    if params["end"]:
        end = datetime.strftime(params["end"], "%Y-%m-%d")
Jerome Touvier's avatar
Jerome Touvier committed
58
        s = f"""{s} AND date <= '{end}'"""
Jerome Touvier's avatar
Jerome Touvier committed
59
60
61
62
63
64
65
66
67
68
    return s


def sql_request(params):
    """ Builds the PostgreSQL request """

    if params["request"] == "storage":
        columns = "year, network, station, channel, quality, type"
        s = f"SELECT DISTINCT ON ({columns}) size, {columns}, date FROM dataholdings"
        s = sql_common_string(params, s)
69
        s = f"{s} AND channel is not NULL"
Jerome Touvier's avatar
Jerome Touvier committed
70
71
72
73
74
75
76

        if params["type"] != "all":
            s = f"""{s} AND (type = '{params["type"]}')"""
        if params["year"]:
            s = f"""{s} AND ({is_like_or_equal(params, "year")})"""
        s = f"""{s} ORDER BY {columns}, date DESC"""
        return s.replace("?", "_").replace("*", "%")
Jerome Touvier's avatar
Jerome Touvier committed
77

Jerome Touvier's avatar
Jerome Touvier committed
78
    else:
Jerome Touvier's avatar
Jerome Touvier committed
79
        table = "sent_data_summary_weekly"
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
80
81
82
83

        if params["request"] == "map":
            s = f"SELECT extract(year from date)::INTEGER as year, country, sum(requests)::BIGINT FROM {table}"
        elif params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
84
            if "all" in params["country"]:
Jerome Touvier's avatar
Jerome Touvier committed
85
                s = f"SELECT sum(requests)::BIGINT, sum(hll_cardinality(clients))::INTEGER FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
86
            else:
Jerome Touvier's avatar
Jerome Touvier committed
87
                s = f"SELECT country, sum(requests)::BIGINT, sum(hll_cardinality(clients))::INTEGER FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
88
        elif params["request"] == "send":
Jerome Touvier's avatar
Jerome Touvier committed
89
            s = f"SELECT sum(bytes)::BIGINT FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
90
        elif params["request"] == "timeseries":
Jerome Touvier's avatar
Jerome Touvier committed
91
            s = f"SELECT date, country, sum(bytes)::BIGINT, sum(hll_cardinality(clients))::INTEGER FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
92
93

        s = sql_common_string(params, s)
Jerome Touvier's avatar
update    
Jerome Touvier committed
94
95
        if "all" not in params["country"]:
            s = f"""{s} AND ({is_like_or_equal(params, "country")})"""
Jerome Touvier's avatar
Jerome Touvier committed
96
97
98
99
100
101
102
103

        if "seedlink" in params["media"] and "dataselect" in params["media"]:
            s = f"{s} AND (protocol = 'seedlink' OR protocol = 'dataselect')"
        elif "seedlink" in params["media"]:
            s = f"{s} AND protocol = 'seedlink'"
        elif "dataselect" in params["media"]:
            s = f"{s} AND protocol = 'dataselect'"

Jerome Touvier's avatar
Jerome Touvier committed
104
        if params["request"] == "country" and "all" not in params["country"]:
Jerome Touvier's avatar
Jerome Touvier committed
105
            s = f"""{s} GROUP BY country"""
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
106
107
        elif params["request"] == "map":
            s = f"""{s} GROUP BY country, year"""
Jerome Touvier's avatar
Jerome Touvier committed
108
109
110
111
112
113
        elif params["request"] == "timeseries":
            if "seedlink" in params["media"] and "dataselect" in params["media"]:
                s = f"""{s} GROUP BY date, country ORDER BY date"""
            else:
                s = f"""{s} GROUP BY date, country, protocol ORDER BY date"""
        return s.replace("?", "_").replace("*", "%")
Jerome Touvier's avatar
Jerome Touvier committed
114
115
116


def collect_data(params):
Jerome Touvier's avatar
Jerome Touvier committed
117
    """ Get the result of the SQL query. """
Jerome Touvier's avatar
Jerome Touvier committed
118
119

    logging.debug("Start collecting data...")
Jerome Touvier's avatar
Jerome Touvier committed
120
    with psycopg2.connect(current_app.config["DATABASE_URI"]) as conn:
Jerome Touvier's avatar
Jerome Touvier committed
121
122
123
        logging.debug(conn.get_dsn_parameters())
        logging.debug(f"Postgres version : {conn.server_version}")
        with conn.cursor() as curs:
Jerome Touvier's avatar
Jerome Touvier committed
124
125
126
            select = sql_request(params)
            logging.debug(select)
            curs.execute(select)
Jerome Touvier's avatar
Jerome Touvier committed
127
            logging.debug(curs.statusmessage)
Jerome Touvier's avatar
Jerome Touvier committed
128
            return curs.fetchall()
Jerome Touvier's avatar
Jerome Touvier committed
129
130


Jerome Touvier's avatar
Jerome Touvier committed
131
def format_results(params, data):
Jerome Touvier's avatar
update    
Jerome Touvier committed
132

Jerome Touvier's avatar
Jerome Touvier committed
133
    data = list(map(list, data))
Jerome Touvier's avatar
Jerome Touvier committed
134
    if params["request"] == "country":
Jerome Touvier's avatar
update    
Jerome Touvier committed
135
        if "all" in params["country"]:
Jerome Touvier's avatar
Jerome Touvier committed
136
            data = [["all", data[0][0], data[0][1]]]
137
138
        if params["format"] != "csv":
            for row in data:
Jerome Touvier's avatar
Jerome Touvier committed
139
140
                if row[1] and row[2]:
                    row[:] = [row[0], "{:_}".format(row[1]), "{:_}".format(row[2])]
Jerome Touvier's avatar
update    
Jerome Touvier committed
141

Jerome Touvier's avatar
Jerome Touvier committed
142
    elif params["request"] == "send":
143
144
145
        if params["format"] != "csv":
            result = data[0][0]
            data = [["{:_}".format(result) + f" bytes ({human_readable_size(result)})"]]
Jerome Touvier's avatar
update    
Jerome Touvier committed
146

Jerome Touvier's avatar
Jerome Touvier committed
147
    data = [[str(val) for val in row] for row in data]
Jerome Touvier's avatar
Jerome Touvier committed
148
    return data
Jerome Touvier's avatar
Jerome Touvier committed
149
150
151
152
153
154


def get_header(params):
    if params["request"] == "storage":
        header = ["size", "year", "network", "station", "channel", "quality", "type"]
        header.append("lastupdated")
Jerome Touvier's avatar
Jerome Touvier committed
155
    elif params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
156
        header = ["country", "requests", "clients"]
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
157
158
    elif params["request"] == "map":
        header = ["country", "requests"]
Jerome Touvier's avatar
Jerome Touvier committed
159
160
161
    elif params["request"] == "send":
        if len(params["media"]) == 1 and "all" not in params["media"]:
            if "seedlink" in params["media"]:
Jerome Touvier's avatar
update    
Jerome Touvier committed
162
                header = ["SEEDLINK"]
Jerome Touvier's avatar
Jerome Touvier committed
163
            elif "dataselect" in params["media"]:
Jerome Touvier's avatar
update    
Jerome Touvier committed
164
                header = ["DATASELECT"]
Jerome Touvier's avatar
Jerome Touvier committed
165
        else:
Jerome Touvier's avatar
update    
Jerome Touvier committed
166
            header = ["SEEDLINK and DATASELECT"]
Jerome Touvier's avatar
Jerome Touvier committed
167
168
    elif params["request"] == "timeseries":
        header = ["time", "country", "bytes", "clients"]
Jerome Touvier's avatar
Jerome Touvier committed
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
    return header


def get_column_widths(data, header=None):
    """ Find the maximum width of each column"""
    ncols = range(len(data[0]))
    colwidths = [max([len(r[i]) for r in data]) for i in ncols]
    if header:
        colwidths = [max(len(h), cw) for h, cw in zip(header, colwidths)]
    return colwidths


def records_to_text(params, data, sep=" "):
    text = ""
    header = get_header(params)
    if params["format"] == "text":
        sizes = get_column_widths(data, header)
        # pad header and rows according to the maximum column width
        header = [val.ljust(sz) for val, sz in zip(header, sizes)]
        for row in data:
            row[:] = [val.ljust(sz) for val, sz in zip(row, sizes)]

    if params["format"] != "request":
        text = sep.join(header) + "\n"

    data = [f"{sep.join(row)}\n" for row in data]
    text += "".join(data)
    return text


def get_response(params, data):
    tic = time.time()
    fname = "resifws-statistics"
    headers = {"Content-type": "text/plain"}
    if params["format"] == "text":
        response = make_response(records_to_text(params, data), headers)
    elif params["format"] == "request":
        response = make_response(records_to_text(params, data), headers)
    elif params["format"] == "sync":
        response = make_response(records_to_text(params, data, "|"), headers)
    elif params["format"] == "csv":
        headers = {"Content-Disposition": f"attachment; filename={fname}.csv"}
        response = make_response(records_to_text(params, data, ","), headers)
        response.headers["Content-type"] = "text/csv"
    logging.debug(f"Response built in {tictac(tic)} seconds.")
    return response


Jerome Touvier's avatar
ad map    
Jerome Touvier committed
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def map_requests(params, data):

    with open("countries.geo.json") as json_file:
        geojson = json.load(json_file)

    geodata = dict(x=[], y=[], name=[], requests=[])
    for country in geojson["features"]:
        if country["geometry"]["type"] not in ["Polygon", "MultiPolygon"]:
            break

        if country["geometry"]["type"] == "Polygon":
            country["geometry"]["coordinates"] = [country["geometry"]["coordinates"]]

        for polygon in country["geometry"]["coordinates"]:
            geodata["x"].append([x[0] for x in polygon[0]])
            geodata["y"].append([x[1] for x in polygon[0]])
            geodata["name"].append(country["properties"]["name"].lower())

    geodata["requests"] = [0 for i in range(0, len(geodata["name"]))]

    if params["year"]:
        years = sorted(params["year"].split(","))
    else:
        years = [str(datetime.now().year)]

    for year, code, requests in data:
        if str(year) in years:
            country = codes.get(code)
            if country:
                for i, name in enumerate(geodata["name"]):
                    if country.lower() in name:
                        geodata["requests"][i] += requests

Jerome Touvier's avatar
fix doc    
Jerome Touvier committed
250
    my_palette = ("#f2f2f2",) + Viridis256
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
251
    # my_palette = ("#f2f2f2",) + tuple(reversed(Plasma256))
Jerome Touvier's avatar
fix doc    
Jerome Touvier committed
252
    # my_palette = ("#fee5d9", "#fcbba1", "#fc9272", "#fb6a4a", "#de2d26")
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
    color_mapper = LogColorMapper(palette=("#f2f2f2",) + my_palette, low=0.1)
    if sum(geodata["requests"]) == 0:
        color_mapper = None

    TOOLS = "save, pan, wheel_zoom, box_zoom, reset"
    plot = figure(
        title=f"""Number of requests to the RESIF data center in {" and ".join(years)}""",
        tools=TOOLS,
        plot_width=1200,
        plot_height=800,
        x_axis_location=None,
        y_axis_location=None,
        tooltips=[
            ("Name", "@name"),
            ("Requests", "@requests"),
            ("(Long, Lat)", "($x, $y)"),
        ],
        active_drag="box_zoom",
        active_scroll="wheel_zoom",
Jerome Touvier's avatar
Jerome Touvier committed
272
        # sizing_mode='stretch_both'
Jerome Touvier's avatar
ad map    
Jerome Touvier committed
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
    )

    plot.toolbar.logo = None
    plot.grid.grid_line_color = None
    plot.hover.point_policy = "follow_mouse"
    plot.patches(
        "x",
        "y",
        source=geodata,
        fill_color={"field": "requests", "transform": color_mapper},
        line_width=0.2,
        fill_alpha=0.8,
        # line_color="white",
    )

    color_bar = ColorBar(
        color_mapper=color_mapper,
        ticker=LogTicker(),
        formatter=NumeralTickFormatter(),
        label_standoff=5,
        width=500,
        height=10,
        location=(0, 0),
        orientation="horizontal",
    )

    plot.add_layout(color_bar, "below")

    return make_response(file_html(plot, CDN, "resifws-statistics"))


Jerome Touvier's avatar
Jerome Touvier committed
304
def get_output(params):
Jerome Touvier's avatar
Jerome Touvier committed
305
    """Statistics output (csv, request, sync, text)
Jerome Touvier's avatar
Jerome Touvier committed
306
307
308
309
310
311
312
313
    :params: parameters
    :returns: text or csv with data statistics"""

    try:
        tic = time.time()
        data = collect_data(params)
        if data is None:
            return data
Jerome Touvier's avatar
update    
Jerome Touvier committed
314

Jerome Touvier's avatar
Jerome Touvier committed
315
316
317
        if not data:
            code = params["nodata"]
            return error_request(msg=f"HTTP._{code}_", details=Error.NODATA, code=code)
Jerome Touvier's avatar
Jerome Touvier committed
318
        logging.info(f"Number of collected rows: {len(data)}")
Jerome Touvier's avatar
Jerome Touvier committed
319

Jerome Touvier's avatar
ad map    
Jerome Touvier committed
320
321
        if params["request"] == "map":
            return map_requests(params, data)
Jerome Touvier's avatar
Jerome Touvier committed
322
        data = format_results(params, data)
Jerome Touvier's avatar
Jerome Touvier committed
323
324
325
326
327
        return get_response(params, data)
    except Exception as ex:
        logging.exception(str(ex))
    finally:
        logging.info(f"Data processed in {tictac(tic)} seconds.")