Commit 4173ee4c authored by Jerome Touvier's avatar Jerome Touvier
Browse files

Initial commit

parents
Pipeline #45586 passed with stage
in 34 seconds
**/logs
**/__pycache__
image: gricad-registry.univ-grenoble-alpes.fr/kubernetes-alpes/buildah:latest
stages:
- build
variables:
REGISTRY_LOGIN: buildah login -u gitlab-ci-token -p $CI_REGISTRY_PASSWORD
REGISTRY_LOGOUT: buildah logout
IMAGE_BUILD: buildah build-using-dockerfile --storage-driver vfs --format docker
IMAGE_PUSH: buildah push --storage-driver vfs
before_script:
- $REGISTRY_LOGIN $CI_REGISTRY
after_script:
- $REGISTRY_LOGOUT $CI_REGISTRY
build:
stage: build
variables:
DOCKERFILE: Dockerfile
IMAGE_NAME: $CI_REGISTRY_IMAGE/ws-statistics:$CI_COMMIT_SHORT_SHA
script:
- $IMAGE_BUILD --file $DOCKERFILE $BUILD_ARG --tag $IMAGE_NAME .
- $IMAGE_PUSH $IMAGE_NAME $IMAGE_NAME
FROM python:3.8-slim
MAINTAINER RESIF DC <resif-dc@univ-grenoble-alpes.fr>
COPY requirements.txt /
RUN pip install --no-cache-dir -r /requirements.txt
RUN pip install --no-cache-dir gunicorn
WORKDIR /appli
COPY start.py config.py ./
COPY apps ./apps/
COPY templates ./templates/
COPY static ./static/
CMD ["/bin/bash", "-c", "gunicorn --bind 0.0.0.0:8000 start:app"]
# Webservice FDSN ws-statistics
Ce service donne accès aux statistiques de RESIF-DC.
## Utilisation de la requête
/query? (request-options) [channel-options] [date-options] [storage-options] [send-options] [output-options]
où :
request-options :: (request=<storage|send|lands>)
channel-options :: [net=<network> & sta=<station> & loc=<location> & cha=<channel>]
date-options :: [starttime=<date>] & [endtime=<date>]
storage-options :: [year=<year>] & [type=<type>]
send-options :: [country=<country_code>] & [media=<all|dataselect|seedlink>]
output-options :: [format=<csv|request|sync|text>]
(..) requis
[..] optionnel
## Exemples de requêtes
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=storage&net=FR&year=2018&type=validated">http://ws.resif.fr/resifws/statistics/1/query?request=storage&net=FR&year=2018&type=validated</a>
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=send&media=dataselect&net=WI&country=IT">http://ws.resif.fr/resifws/statistics/1/query?request=send&media=dataselect&net=WI&country=IT</a>
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=dataselect&net=RA&country=IT,US,FR">http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=dataselect&net=RA&country=IT,US,FR</a>
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=seedlink&net=RA&starttime=2020-01-01">http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=seedlink&net=RA&starttime=2020-01-01</a>
# Webservice FDSN ws-statistics
This service provides access to the RESIF-DC statistics.
## Query usage
/query? (request-options) [channel-options] [date-options] [storage-options] [send-options] [output-options]
where :
request-options :: (request=<storage|send|lands>)
channel-options :: [net=<network> & sta=<station> & loc=<location> & cha=<channel>]
date-options :: [starttime=<date>] & [endtime=<date>]
storage-options :: [year=<year>] & [type=<type>]
send-options :: [country=<country_code>] & [media=<all|dataselect|seedlink>]
output-options :: [format=<csv|request|sync|text>]
(..) required
[..] optional
## Sample queries
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=storage&net=FR&year=2018&type=validated">http://ws.resif.fr/resifws/statistics/1/query?request=storage&net=FR&year=2018&type=validated</a>
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=send&media=dataselect&net=WI&country=IT">http://ws.resif.fr/resifws/statistics/1/query?request=send&media=dataselect&net=WI&country=IT</a>
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=dataselect&net=RA&country=IT,US,FR">http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=dataselect&net=RA&country=IT,US,FR</a>
<a href="http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=seedlink&net=RA&starttime=2020-01-01">http://ws.resif.fr/resifws/statistics/1/query?request=lands&media=seedlink&net=RA&starttime=2020-01-01</a>
VERSION = "1.0.0"
parameters = {
"network": None,
"station": None,
"location": None,
"channel": None,
"starttime": None,
"endtime": None,
"net": "*",
"sta": "*",
"loc": "*",
"cha": "*",
"start": None,
"end": None,
"country": "*",
"year": None,
"type": "all",
"request": None,
"media": "all",
"granularity": None,
"orderby": "nslc",
"nodata": "204",
"format": "text",
}
ALIAS = [
("network", "net"),
("station", "sta"),
("location", "loc"),
("channel", "cha"),
("starttime", "start"),
("endtime", "end"),
]
BOOLEANS = []
FLOATS = []
NOT_NONE = ["request"]
STRING_TRUE = ("yes", "true", "t", "y", "1", "")
STRING_FALSE = ("no", "false", "f", "n", "0")
REQUEST = ("storage", "send", "lands")
DATATYPE = ("all", "bud", "validated")
GRANULARITY = ("year", "month")
ORDERBY = ("nslc", "time", "country", "protocol")
OUTPUT = ("csv", "request", "sync", "text")
MEDIA = (
"all",
"seedlink",
"dataselect",
)
NODATA_CODE = ("204", "404")
TIMEOUT = 120
class Error:
UNKNOWN_PARAM = "Unknown query parameter: "
MULTI_PARAM = " Multiple entries for query parameter: "
VALID_PARAM = "Valid parameters."
START_LATER = "The starttime cannot be later than the endtime: "
UNSPECIFIED = "Error processing your request."
NO_CONNECTION = "No services could be discovered at http://ws.resif.fr.\n\
This could be due to a temporary service outage, an invalid FDSN service address,\n\
an inactive internet connection or a blocking firewall rule."
OK_CONNECTION = "Connection OK. "
NODATA = "Your query doesn't match any available data."
TIMEOUT = f"Your query exceeds timeout ({TIMEOUT} seconds)."
MISSING = "Missing parameter: "
BAD_VAL = " Invalid value: "
CHAR = "White space(s) or invalid string. Invalid value for: "
EMPTY = "Empty string. Invalid value for: "
BOOL = "(Valid boolean values are: true/false, yes/no, t/f or 1/0)"
NETWORK = "Invalid network code: "
STATION = "Invalid station code: "
LOCATION = "Invalid location code: "
CHANNEL = "Invalid channel code: "
COUNTRY = "Invalid country code: "
YEAR = "Invalid year: "
TIME = "Bad date value: "
DATATYPE = f"Accepted type values are: {DATATYPE}." + BAD_VAL
GRANULARITY = f"Accepted type values are: {GRANULARITY}." + BAD_VAL
REQUEST = f"Accepted request values are: {REQUEST}." + BAD_VAL
MEDIA = f"Accepted media values are: {MEDIA}." + BAD_VAL
ORDERBY = f"Accepted orderby values are: {ORDERBY}." + BAD_VAL
OUTPUT = f"Accepted output values are: {OUTPUT}." + BAD_VAL
NODATA_CODE = f"Accepted nodata values are: {NODATA_CODE}." + BAD_VAL
NO_SELECTION = "Request contains no selections."
NO_WILDCARDS = "Wildcards or list are not allowed." + BAD_VAL
class HTTP:
_200_ = "Successful request. "
_204_ = "No data matches the selection. "
_400_ = "Bad request due to improper value. "
_401_ = "Authentication is required. "
_403_ = "Forbidden access. "
_404_ = "No data matches the selection. "
_408_ = "Request exceeds timeout. "
_409_ = "Too much data. "
_413_ = "Request too large. "
_414_ = "Request URI too large. "
_500_ = "Internal server error. "
_503_ = "Service unavailable. "
import logging
from apps.globals import Error, HTTP
from apps.constants import NOT_NONE
from apps.utils import check_base_parameters
from apps.utils import error_param
from apps.utils import is_valid_country
from apps.utils import is_valid_integer
from apps.utils import is_valid_nodata
from apps.utils import is_valid_orderby
from apps.utils import is_valid_output
from apps.utils import is_valid_media
from apps.utils import is_valid_request
from apps.utils import is_valid_datatype
from apps.utils import is_valid_granularity
from apps.utils import is_valid_year
def check_parameters(params):
try:
# Check base parameters.
(params, error) = check_base_parameters(params, NOT_NONE)
if error["code"] != 200:
return (params, error)
country = params["country"].split(",")
for c in country:
logging.debug(c)
if not is_valid_country(c):
return error_param(params, Error.COUNTRY + c)
# orderby parameter validation
if params["orderby"] is not None:
if not is_valid_orderby(params["orderby"]):
return error_param(params, Error.ORDERBY + str(params["orderby"]))
params["orderby"] = params["orderby"].lower()
else:
params["orderby"] = "nslc"
# media parameter validation
if params["media"]:
params["media"] = params["media"].split(",")
for ind, media in enumerate(params["media"]):
if not is_valid_media(media):
return error_param(params, Error.MEDIA + str(media))
params["media"][ind] = media.lower()
# year parameter validation
if params["year"]:
years = params["year"].split(",")
for ind, year in enumerate(years):
if not is_valid_year(year):
return error_param(params, Error.YEAR + str(year))
# type parameter validation
if params["type"]:
if not is_valid_datatype(params["type"]):
return error_param(params, Error.DATATYPE + str(params["type"]))
params["type"] = params["type"].lower()
# request parameter validation
if params["request"]:
if not is_valid_request(params["request"]):
return error_param(params, Error.REQUEST + str(params["request"]))
params["request"] = params["request"].lower()
# granularity parameter validation
if params["granularity"]:
if not is_valid_granularity(params["granularity"]):
return error_param(
params, Error.GRANULARITY + str(params["granularity"])
)
params["granularity"] = params["granularity"].lower()
# output parameter validation
if not is_valid_output(params["format"]):
return error_param(params, Error.OUTPUT + str(params["format"]))
params["format"] = params["format"].lower()
# nodata parameter validation
if not is_valid_nodata(params["nodata"]):
return error_param(params, Error.NODATA_CODE + str(params["nodata"]))
params["nodata"] = params["nodata"].lower()
for key, val in params.items():
logging.debug(key + ": " + str(val))
return (params, error)
except Exception as e:
logging.exception(str(e))
return (params, {"msg": HTTP._500_, "details": Error.PROCESSING, "code": 500})
# request liste des requêtes parmi :
# storage : quantité de données stockées au nœud B, brute et validée
# Option years : liste des années pour lesquelles on demande la quantité de données
# send : volume de données distribuées par protocoles (accepte les options media et country)
# lands : liste des pays avec le nombre de requêtes passées depuis ces pays (accepte les options media et country)
# granularity : soit par année (year), soit par mois (month) (ie en interne, time(52w) et time (4w)) TODO.
# Pour les paramètres acceptant des listes le séparateur utilisé est la virgule. Les options de code réseau network, station, location et channel acceptent les jokers et les listes comme les autres WS FDSN. Pour les jokers, ? représente n’importe quel caractère unique, alors que * représente zéro caractère ou plus.
# Options supplémentaires :
# media : protocoles utilisés pour la requête (par défaut, tous)
# dataselect : pour les activités par webservice dataselect
# seedlink : pour les activités par ringserver (Pour l'instant ne peut être obtenu que indépendament.)
# all : somme de tous les types d'activité (TODO. Pour l'instant signifie tous les dataselect.)
# country : filtre les réponses avec une liste des pays d'origine des requêtes.
import logging
import os
import re
import time
from datetime import datetime
import psycopg2
from flask import make_response
from apps.globals import Error
from apps.utils import error_request
from apps.utils import overflow_error
from apps.utils import tictac
def is_like_or_equal(params, key):
""" Builds the condition for the specified key in the "where" clause taking into account lists or wildcards. """
subquery = list()
for param in params[key].split(","):
op = "LIKE" if re.search(r"[*?]", param) else "="
subquery.append(f"{key} {op} '{param}'")
return " OR ".join(subquery)
def get_table(media):
if media == "seedlink":
return "ringserver_events"
return "dataselectvol"
def sql_common_string(params, begin="", date_s="date"):
s = begin
# network, station, location, channel parameters
s = f"""{s} WHERE ({is_like_or_equal(params, "network")})"""
if params["station"] != "*":
s = f"""{s} AND ({is_like_or_equal(params, "station")})"""
if params["location"] != "*" and params["request"] != "storage":
s = f"""{s} AND ({is_like_or_equal(params, "location")})"""
if params["channel"] != "*":
s = f"""{s} AND ({is_like_or_equal(params, "channel")})"""
# starttime, endtime parameters
if params["start"]:
start = datetime.strftime(params["start"], "%Y-%m-%d")
s = f"""{s} AND {date_s} >= '{start}'"""
if params["end"]:
end = datetime.strftime(params["end"], "%Y-%m-%d")
s = f"""{s} AND {date_s} <= '{end}'"""
return s
def sql_request(params):
""" Builds the PostgreSQL request """
if params["request"] == "storage":
columns = "year, network, station, channel, quality, type"
s = f"SELECT DISTINCT ON ({columns}) size, {columns}, date FROM dataholdings"
s = sql_common_string(params, s)
if params["type"] != "all":
s = f"""{s} AND (type = '{params["type"]}')"""
if params["year"]:
s = f"""{s} AND ({is_like_or_equal(params, "year")})"""
s = f"""{s} ORDER BY {columns}, date DESC"""
return s.replace("?", "_").replace("*", "%")
else:
select = list()
for media in params["media"]:
date_s = "time" if media == "seedlink" else "date"
if params["request"] == "lands":
s = f"SELECT country, count({date_s}) FROM {get_table(media)}"
if params["request"] == "send":
s = f"SELECT sum(bytes) FROM {get_table(media)}"
s = sql_common_string(params, s, date_s)
if params["request"] == "lands":
s = f"""{s} AND ({is_like_or_equal(params, "country")}) GROUP BY country"""
if params["request"] == "send":
s = f"""{s} AND ({is_like_or_equal(params, "country")})"""
if params["granularity"]:
granularity = "52w" if params["granularity"] == "year" else "4w"
s = f"""{s} , time({granularity})"""
select.append(s)
select = " UNION ".join(select)
return select.replace("?", "_").replace("*", "%")
def collect_data(params):
""" Connect to the PostgreSQL RESIF database """
tic = time.time()
data = list()
logging.debug("Start collecting data...")
with psycopg2.connect(os.getenv("PG_DBURI")) as conn:
logging.debug(conn.get_dsn_parameters())
logging.debug(f"Postgres version : {conn.server_version}")
with conn.cursor() as curs:
SQL_SELECT = sql_request(params)
curs.execute(SQL_SELECT)
logging.debug(f"{SQL_SELECT}")
logging.debug(curs.statusmessage)
for row in curs.fetchall():
data.append([str(val) for val in row])
logging.debug(f"Get data in {tictac(tic)} seconds.")
return data
def sum_results(params, data):
""" Adds the results from the different media tables. """
newdata = list()
if params["request"] == "lands":
result = dict()
for row in data:
if row[0] in result:
result[row[0]] = result[row[0]] + int(row[1])
else:
result[row[0]] = int(row[1])
for key, val in result.items():
newdata.append([key, str(val)])
elif params["request"] == "send":
print(data)
result = 0
for row in data:
result = result + int(row[0])
newdata.append([str(result)])
return newdata
def get_header(params):
if params["request"] == "storage":
header = ["size", "year", "network", "station", "channel", "quality", "type"]
header.append("lastupdated")
elif params["request"] == "lands":
header = ["country", "requests"]
elif params["request"] == "send":
if len(params["media"]) == 1 and "all" not in params["media"]:
if "seedlink" in params["media"]:
header = ["SEEDLINK (in bytes)"]
elif "dataselect" in params["media"]:
header = ["DATASELECT (in bytes)"]
else:
header = ["SEEDLINK and DATASELECT (in bytes)"]
return header
def get_column_widths(data, header=None):
""" Find the maximum width of each column"""
ncols = range(len(data[0]))
colwidths = [max([len(r[i]) for r in data]) for i in ncols]
if header:
colwidths = [max(len(h), cw) for h, cw in zip(header, colwidths)]
return colwidths
def records_to_text(params, data, sep=" "):
text = ""
header = get_header(params)
if params["format"] == "text":
sizes = get_column_widths(data, header)
# pad header and rows according to the maximum column width
header = [val.ljust(sz) for val, sz in zip(header, sizes)]
for row in data:
row[:] = [val.ljust(sz) for val, sz in zip(row, sizes)]
if params["format"] != "request":
text = sep.join(header) + "\n"
data = [f"{sep.join(row)}\n" for row in data]
text += "".join(data)
return text
def get_response(params, data):
tic = time.time()
fname = "resifws-statistics"
headers = {"Content-type": "text/plain"}
if params["format"] == "text":
response = make_response(records_to_text(params, data), headers)
elif params["format"] == "request":
response = make_response(records_to_text(params, data), headers)
elif params["format"] == "sync":
response = make_response(records_to_text(params, data, "|"), headers)
elif params["format"] == "csv":
headers = {"Content-Disposition": f"attachment; filename={fname}.csv"}
response = make_response(records_to_text(params, data, ","), headers)
response.headers["Content-type"] = "text/csv"
logging.debug(f"Response built in {tictac(tic)} seconds.")
return response
def get_statistics(params):
"""Statistics output (csv, request, sync, text)
:params: parameters
:returns: text or csv with data statistics"""
try:
tic = time.time()
data = collect_data(params)
if data is None:
return data
if not data:
code = params["nodata"]
return error_request(msg=f"HTTP._{code}_", details=Error.NODATA, code=code)
nrows = len(data)
logging.debug(f"Number of collected rows: {nrows}")
if params["request"] != "storage":
data = sum_results(params, data)
return get_response(params, data)
except Exception as ex:
logging.exception(str(ex))
finally:
logging.info(f"Data processed in {tictac(tic)} seconds.")
import logging
import queue
from copy import copy
from multiprocessing import Process, Queue
from apps.constants import ALIAS
from apps.constants import parameters
from apps.model import check_parameters
from apps.output import get_statistics
from apps.globals import Error
from apps.globals import HTTP
from apps.globals import TIMEOUT
from apps.utils import check_request
from apps.utils import error_request
def checks_get(request):
# get default parameters
params = copy(parameters)
# check if the parameters are unknown
(p, result) = check_request(request, params, ALIAS)
if result["code"] != 200:
return (p, result)
# determine selected features
params["base_url"] = request.base_url
for key, val in params.items():
params[key] = request.args.get(key, val)
for key, alias in ALIAS:
if params[key] is None:
params[key] = request.args.get(alias, params[alias])
else:
params[alias] = params[key]
return check_parameters(params)
def output(request):
try:
process = None
result = {"msg": HTTP._400_, "details": Error.UNKNOWN_PARAM, "code": 400}
logging.debug(request.url)
(pdic, result) = checks_get(request)
if result["code"] == 200:
def put_response(q):
q.put(get_statistics(pdic))
q = Queue()
process = Process(target=put_response, args=(q,))
process.start()
resp = q.get(timeout=TIMEOUT)
if resp:
return resp
else:
raise Exception
except queue.Empty:
result = {"msg": HTTP._408_, "details": Error.TIMEOUT, "code": 408}