output.py 11.5 KB
Newer Older
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
1
import json
Jerome Touvier's avatar
Jerome Touvier committed
2 3 4 5 6
import logging
import re
import time
from datetime import datetime

Jerome Touvier's avatar
ad map  
Jerome Touvier committed
7 8 9 10 11 12
from bokeh.embed import file_html
from bokeh.models import ColorBar, LogColorMapper, LogTicker, NumeralTickFormatter
from bokeh.palettes import Plasma256, Viridis256
from bokeh.plotting import figure
from bokeh.resources import CDN

Jerome Touvier's avatar
Jerome Touvier committed
13
from flask import current_app, make_response
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
14
import psycopg2
Jerome Touvier's avatar
Jerome Touvier committed
15 16

from apps.globals import Error
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
17
from apps.isoalpha2 import codes
Jerome Touvier's avatar
Jerome Touvier committed
18 19 20 21
from apps.utils import error_request
from apps.utils import tictac


Jerome Touvier's avatar
update  
Jerome Touvier committed
22 23
def human_readable_size(size_in_bytes):
    index = 0
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
24
    size_units = ["B", "KB", "MB", "GB", "TB", "PB", "PB"]
Jerome Touvier's avatar
update  
Jerome Touvier committed
25 26 27
    while size_in_bytes >= 1024 and index < 6:
        size_in_bytes /= 1024
        index += 1
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
28
    return f"{round(size_in_bytes, 2)} {size_units[index]}"
Jerome Touvier's avatar
update  
Jerome Touvier committed
29 30


Jerome Touvier's avatar
Jerome Touvier committed
31 32 33 34 35 36 37 38 39 40
def is_like_or_equal(params, key):
    """ Builds the condition for the specified key in the "where" clause taking into account lists or wildcards. """

    subquery = list()
    for param in params[key].split(","):
        op = "LIKE" if re.search(r"[*?]", param) else "="
        subquery.append(f"{key} {op} '{param}'")
    return " OR ".join(subquery)


Jerome Touvier's avatar
Jerome Touvier committed
41
def sql_common_string(params, begin=""):
Jerome Touvier's avatar
Jerome Touvier committed
42 43 44 45 46 47 48 49 50 51 52 53 54
    s = begin
    # network, station, location, channel parameters
    s = f"""{s} WHERE ({is_like_or_equal(params, "network")})"""
    if params["station"] != "*":
        s = f"""{s} AND ({is_like_or_equal(params, "station")})"""
    if params["location"] != "*" and params["request"] != "storage":
        s = f"""{s} AND ({is_like_or_equal(params, "location")})"""
    if params["channel"] != "*":
        s = f"""{s} AND ({is_like_or_equal(params, "channel")})"""

    # starttime, endtime parameters
    if params["start"]:
        start = datetime.strftime(params["start"], "%Y-%m-%d")
Jerome Touvier's avatar
Jerome Touvier committed
55
        s = f"""{s} AND date >= '{start}'"""
Jerome Touvier's avatar
Jerome Touvier committed
56 57
    if params["end"]:
        end = datetime.strftime(params["end"], "%Y-%m-%d")
Jerome Touvier's avatar
Jerome Touvier committed
58
        s = f"""{s} AND date <= '{end}'"""
Jerome Touvier's avatar
Jerome Touvier committed
59 60 61 62 63 64 65 66 67 68
    return s


def sql_request(params):
    """ Builds the PostgreSQL request """

    if params["request"] == "storage":
        columns = "year, network, station, channel, quality, type"
        s = f"SELECT DISTINCT ON ({columns}) size, {columns}, date FROM dataholdings"
        s = sql_common_string(params, s)
69
        s = f"{s} AND channel is not NULL"
Jerome Touvier's avatar
Jerome Touvier committed
70 71 72 73 74 75 76

        if params["type"] != "all":
            s = f"""{s} AND (type = '{params["type"]}')"""
        if params["year"]:
            s = f"""{s} AND ({is_like_or_equal(params, "year")})"""
        s = f"""{s} ORDER BY {columns}, date DESC"""
        return s.replace("?", "_").replace("*", "%")
Jerome Touvier's avatar
Jerome Touvier committed
77

Jerome Touvier's avatar
Jerome Touvier committed
78
    else:
Jerome Touvier's avatar
Jerome Touvier committed
79
        table = "sent_data_summary_weekly"
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
80 81 82 83

        if params["request"] == "map":
            s = f"SELECT extract(year from date)::INTEGER as year, country, sum(requests)::BIGINT FROM {table}"
        elif params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
84
            if "all" in params["country"]:
Jerome Touvier's avatar
Jerome Touvier committed
85
                s = f"SELECT sum(requests)::BIGINT, hll_cardinality(hll_union_agg(clients))::INTEGER FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
86
            else:
Jerome Touvier's avatar
Jerome Touvier committed
87
                s = f"SELECT country, sum(requests)::BIGINT, hll_cardinality(hll_union_agg(clients))::INTEGER FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
88
        elif params["request"] == "send":
Jerome Touvier's avatar
Jerome Touvier committed
89
            s = f"SELECT sum(bytes)::BIGINT FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
90
        elif params["request"] == "timeseries":
Jerome Touvier's avatar
Jerome Touvier committed
91
            s = f"SELECT date, country, sum(bytes)::BIGINT, hll_cardinality(hll_union_agg(clients))::INTEGER FROM {table}"
Jerome Touvier's avatar
Jerome Touvier committed
92 93

        s = sql_common_string(params, s)
Jerome Touvier's avatar
update  
Jerome Touvier committed
94 95
        if "all" not in params["country"]:
            s = f"""{s} AND ({is_like_or_equal(params, "country")})"""
Jerome Touvier's avatar
Jerome Touvier committed
96 97 98 99 100 101 102 103

        if "seedlink" in params["media"] and "dataselect" in params["media"]:
            s = f"{s} AND (protocol = 'seedlink' OR protocol = 'dataselect')"
        elif "seedlink" in params["media"]:
            s = f"{s} AND protocol = 'seedlink'"
        elif "dataselect" in params["media"]:
            s = f"{s} AND protocol = 'dataselect'"

Jerome Touvier's avatar
Jerome Touvier committed
104
        if params["request"] == "country" and "all" not in params["country"]:
Jerome Touvier's avatar
Jerome Touvier committed
105
            s = f"""{s} GROUP BY country"""
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
106 107
        elif params["request"] == "map":
            s = f"""{s} GROUP BY country, year"""
Jerome Touvier's avatar
Jerome Touvier committed
108 109 110 111 112 113
        elif params["request"] == "timeseries":
            if "seedlink" in params["media"] and "dataselect" in params["media"]:
                s = f"""{s} GROUP BY date, country ORDER BY date"""
            else:
                s = f"""{s} GROUP BY date, country, protocol ORDER BY date"""
        return s.replace("?", "_").replace("*", "%")
Jerome Touvier's avatar
Jerome Touvier committed
114 115 116


def collect_data(params):
Jerome Touvier's avatar
Jerome Touvier committed
117
    """ Get the result of the SQL query. """
Jerome Touvier's avatar
Jerome Touvier committed
118 119

    logging.debug("Start collecting data...")
Jerome Touvier's avatar
Jerome Touvier committed
120
    with psycopg2.connect(current_app.config["DATABASE_URI"]) as conn:
Jerome Touvier's avatar
Jerome Touvier committed
121 122 123
        logging.debug(conn.get_dsn_parameters())
        logging.debug(f"Postgres version : {conn.server_version}")
        with conn.cursor() as curs:
Jerome Touvier's avatar
Jerome Touvier committed
124 125 126
            select = sql_request(params)
            logging.debug(select)
            curs.execute(select)
Jerome Touvier's avatar
Jerome Touvier committed
127
            logging.debug(curs.statusmessage)
Jerome Touvier's avatar
Jerome Touvier committed
128
            return curs.fetchall()
Jerome Touvier's avatar
Jerome Touvier committed
129 130


Jerome Touvier's avatar
Jerome Touvier committed
131
def format_results(params, data):
Jerome Touvier's avatar
update  
Jerome Touvier committed
132

Jerome Touvier's avatar
Jerome Touvier committed
133
    data = list(map(list, data))
Jerome Touvier's avatar
Jerome Touvier committed
134
    if params["request"] == "country":
Jerome Touvier's avatar
update  
Jerome Touvier committed
135
        if "all" in params["country"]:
Jerome Touvier's avatar
Jerome Touvier committed
136
            data = [["all", data[0][0], data[0][1]]]
137 138
        if params["format"] != "csv":
            for row in data:
Jerome Touvier's avatar
Jerome Touvier committed
139 140
                if row[1] and row[2]:
                    row[:] = [row[0], "{:_}".format(row[1]), "{:_}".format(row[2])]
Jerome Touvier's avatar
update  
Jerome Touvier committed
141

Jerome Touvier's avatar
Jerome Touvier committed
142
    elif params["request"] == "send":
143 144 145
        if params["format"] != "csv":
            result = data[0][0]
            data = [["{:_}".format(result) + f" bytes ({human_readable_size(result)})"]]
Jerome Touvier's avatar
update  
Jerome Touvier committed
146

Jerome Touvier's avatar
Jerome Touvier committed
147
    data = [[str(val) for val in row] for row in data]
Jerome Touvier's avatar
Jerome Touvier committed
148
    return data
Jerome Touvier's avatar
Jerome Touvier committed
149 150 151 152 153 154


def get_header(params):
    if params["request"] == "storage":
        header = ["size", "year", "network", "station", "channel", "quality", "type"]
        header.append("lastupdated")
Jerome Touvier's avatar
Jerome Touvier committed
155
    elif params["request"] == "country":
Jerome Touvier's avatar
Jerome Touvier committed
156
        header = ["country", "requests", "clients"]
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
157 158
    elif params["request"] == "map":
        header = ["country", "requests"]
Jerome Touvier's avatar
Jerome Touvier committed
159 160 161
    elif params["request"] == "send":
        if len(params["media"]) == 1 and "all" not in params["media"]:
            if "seedlink" in params["media"]:
Jerome Touvier's avatar
update  
Jerome Touvier committed
162
                header = ["SEEDLINK"]
Jerome Touvier's avatar
Jerome Touvier committed
163
            elif "dataselect" in params["media"]:
Jerome Touvier's avatar
update  
Jerome Touvier committed
164
                header = ["DATASELECT"]
Jerome Touvier's avatar
Jerome Touvier committed
165
        else:
Jerome Touvier's avatar
update  
Jerome Touvier committed
166
            header = ["SEEDLINK and DATASELECT"]
Jerome Touvier's avatar
Jerome Touvier committed
167 168
    elif params["request"] == "timeseries":
        header = ["time", "country", "bytes", "clients"]
Jerome Touvier's avatar
Jerome Touvier committed
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
    return header


def get_column_widths(data, header=None):
    """ Find the maximum width of each column"""
    ncols = range(len(data[0]))
    colwidths = [max([len(r[i]) for r in data]) for i in ncols]
    if header:
        colwidths = [max(len(h), cw) for h, cw in zip(header, colwidths)]
    return colwidths


def records_to_text(params, data, sep=" "):
    text = ""
    header = get_header(params)
    if params["format"] == "text":
        sizes = get_column_widths(data, header)
        # pad header and rows according to the maximum column width
        header = [val.ljust(sz) for val, sz in zip(header, sizes)]
        for row in data:
            row[:] = [val.ljust(sz) for val, sz in zip(row, sizes)]

    if params["format"] != "request":
        text = sep.join(header) + "\n"

    data = [f"{sep.join(row)}\n" for row in data]
    text += "".join(data)
    return text


def get_response(params, data):
    tic = time.time()
    fname = "resifws-statistics"
    headers = {"Content-type": "text/plain"}
    if params["format"] == "text":
        response = make_response(records_to_text(params, data), headers)
    elif params["format"] == "request":
        response = make_response(records_to_text(params, data), headers)
    elif params["format"] == "sync":
        response = make_response(records_to_text(params, data, "|"), headers)
    elif params["format"] == "csv":
        headers = {"Content-Disposition": f"attachment; filename={fname}.csv"}
        response = make_response(records_to_text(params, data, ","), headers)
        response.headers["Content-type"] = "text/csv"
    logging.debug(f"Response built in {tictac(tic)} seconds.")
    return response


Jerome Touvier's avatar
ad map  
Jerome Touvier committed
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
def map_requests(params, data):

    with open("countries.geo.json") as json_file:
        geojson = json.load(json_file)

    geodata = dict(x=[], y=[], name=[], requests=[])
    for country in geojson["features"]:
        if country["geometry"]["type"] not in ["Polygon", "MultiPolygon"]:
            break

        if country["geometry"]["type"] == "Polygon":
            country["geometry"]["coordinates"] = [country["geometry"]["coordinates"]]

        for polygon in country["geometry"]["coordinates"]:
            geodata["x"].append([x[0] for x in polygon[0]])
            geodata["y"].append([x[1] for x in polygon[0]])
            geodata["name"].append(country["properties"]["name"].lower())

    geodata["requests"] = [0 for i in range(0, len(geodata["name"]))]

    if params["year"]:
        years = sorted(params["year"].split(","))
    else:
        years = [str(datetime.now().year)]

    for year, code, requests in data:
        if str(year) in years:
            country = codes.get(code)
            if country:
                for i, name in enumerate(geodata["name"]):
                    if country.lower() in name:
                        geodata["requests"][i] += requests

Jerome Touvier's avatar
fix doc  
Jerome Touvier committed
250
    my_palette = ("#f2f2f2",) + Viridis256
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
251
    # my_palette = ("#f2f2f2",) + tuple(reversed(Plasma256))
Jerome Touvier's avatar
fix doc  
Jerome Touvier committed
252
    # my_palette = ("#fee5d9", "#fcbba1", "#fc9272", "#fb6a4a", "#de2d26")
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
    color_mapper = LogColorMapper(palette=("#f2f2f2",) + my_palette, low=0.1)
    if sum(geodata["requests"]) == 0:
        color_mapper = None

    TOOLS = "save, pan, wheel_zoom, box_zoom, reset"
    plot = figure(
        title=f"""Number of requests to the RESIF data center in {" and ".join(years)}""",
        tools=TOOLS,
        plot_width=1200,
        plot_height=800,
        x_axis_location=None,
        y_axis_location=None,
        tooltips=[
            ("Name", "@name"),
            ("Requests", "@requests"),
            ("(Long, Lat)", "($x, $y)"),
        ],
        active_drag="box_zoom",
        active_scroll="wheel_zoom",
Jerome Touvier's avatar
Jerome Touvier committed
272
        # sizing_mode='stretch_both'
Jerome Touvier's avatar
ad map  
Jerome Touvier committed
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    )

    plot.toolbar.logo = None
    plot.grid.grid_line_color = None
    plot.hover.point_policy = "follow_mouse"
    plot.patches(
        "x",
        "y",
        source=geodata,
        fill_color={"field": "requests", "transform": color_mapper},
        line_width=0.2,
        fill_alpha=0.8,
        # line_color="white",
    )

    color_bar = ColorBar(
        color_mapper=color_mapper,
        ticker=LogTicker(),
        formatter=NumeralTickFormatter(),
        label_standoff=5,
        width=500,
        height=10,
        location=(0, 0),
        orientation="horizontal",
    )

    plot.add_layout(color_bar, "below")

    return make_response(file_html(plot, CDN, "resifws-statistics"))


Jerome Touvier's avatar
Jerome Touvier committed
304
def get_output(params):
Jerome Touvier's avatar
Jerome Touvier committed
305
    """Statistics output (csv, request, sync, text)
Jerome Touvier's avatar
Jerome Touvier committed
306 307 308 309 310 311 312 313
    :params: parameters
    :returns: text or csv with data statistics"""

    try:
        tic = time.time()
        data = collect_data(params)
        if data is None:
            return data
Jerome Touvier's avatar
update  
Jerome Touvier committed
314

Jerome Touvier's avatar
Jerome Touvier committed
315 316 317
        if not data:
            code = params["nodata"]
            return error_request(msg=f"HTTP._{code}_", details=Error.NODATA, code=code)
Jerome Touvier's avatar
Jerome Touvier committed
318
        logging.info(f"Number of collected rows: {len(data)}")
Jerome Touvier's avatar
Jerome Touvier committed
319

Jerome Touvier's avatar
ad map  
Jerome Touvier committed
320 321
        if params["request"] == "map":
            return map_requests(params, data)
Jerome Touvier's avatar
Jerome Touvier committed
322
        data = format_results(params, data)
Jerome Touvier's avatar
Jerome Touvier committed
323 324 325 326 327
        return get_response(params, data)
    except Exception as ex:
        logging.exception(str(ex))
    finally:
        logging.info(f"Data processed in {tictac(tic)} seconds.")