Vous avez reçu un message "Your GitLab account has been locked ..." ? Pas d'inquiétude : lisez cet article https://docs.gricad-pages.univ-grenoble-alpes.fr/help/unlock/

Commit cf32dc71 authored by Jerome Touvier's avatar Jerome Touvier
Browse files

Initial commit

parents
Pipeline #48932 failed with stage
in 27 seconds
**/logs
**/__pycache__
image: gricad-registry.univ-grenoble-alpes.fr/kubernetes-alpes/buildah:latest
stages:
- build
variables:
REGISTRY_LOGIN: buildah login -u gitlab-ci-token -p $CI_REGISTRY_PASSWORD
REGISTRY_LOGOUT: buildah logout
IMAGE_BUILD: buildah build-using-dockerfile --storage-driver vfs --format docker
IMAGE_PUSH: buildah push --storage-driver vfs
before_script:
- $REGISTRY_LOGIN $CI_REGISTRY
after_script:
- $REGISTRY_LOGOUT $CI_REGISTRY
build:
stage: build
variables:
DOCKERFILE: Dockerfile
IMAGE_NAME: $CI_REGISTRY_IMAGE/ws-ph5-availability:$CI_COMMIT_SHORT_SHA
script:
- echo $CI_COMMIT_SHORT_SHA > ./static/commit.txt
- $IMAGE_BUILD --file $DOCKERFILE $BUILD_ARG --tag $IMAGE_NAME .
- $IMAGE_PUSH $IMAGE_NAME $IMAGE_NAME
FROM python:3.8-slim
MAINTAINER RESIF DC <resif-dc@univ-grenoble-alpes.fr>
RUN pip install --no-cache-dir gunicorn
COPY requirements.txt /
RUN pip install --no-cache-dir -r /requirements.txt
WORKDIR /appli
COPY start.py config.py ./
COPY apps ./apps/
COPY templates ./templates/
COPY static ./static/
CMD ["/bin/bash", "-c", "gunicorn --bind 0.0.0.0:8000 start:app"]
This diff is collapsed.
# Webservice ph5-availability
ws-ph5-availability implements the specification of the ph5-availability webservice.
## Play around with docker
docker build -t ws-ph5-availability .
docker run --rm -e RUNMODE=test -p 8000:8000 --name ws-ph5-availability ws-ph5-availability
Then :
```
wget -O - http://localhost:8000/1/application.wadl
```
Run it in debug mode with flask:
```
RUNMODE=test FLASK_APP=start.py flask run
```
## RUNMODE builtin values
* `production`
* `test`
# version string of the json format
SCHEMAVERSION = 1.0
# limitations
TIMEOUT = 120
MAX_DAYS = None
MAX_ROWS = "2_500_000"
MAX_DATA_ROWS = int(MAX_ROWS.replace(",", ""))
MAX_MERGEGAPS = 10000000000
# available parameter values
OUTPUT = ("geocsv", "json", "request", "text", "zip")
NODATA_CODE = ("204", "404")
STRING_TRUE = ("yes", "true", "t", "y", "1", "")
STRING_FALSE = ("no", "false", "f", "n", "0")
SHOW = "latestupdate"
MERGE = ("quality", "samplerate", "overlap")
ORDERBY = (
"nslc_time_quality_samplerate",
"timespancount",
"timespancount_desc",
"latestupdate",
"latestupdate_desc",
)
# error message constants
DOCUMENTATION_URI = "http://ws.resif.fr/resifws/ph5ws/availability/1/"
SERVICE = "resifws-ph5-availability"
VERSION = "1.0.0"
class Error:
UNKNOWN_PARAM = "Unknown query parameter: "
MULTI_PARAM = "Multiple entries for query parameter: "
VALID_PARAM = "Valid parameters."
START_LATER = "The starttime cannot be later than the endtime: "
TOO_LONG_DURATION = "Too many days requested (greater than "
TOO_MUCH_ROWS = f"The request exceeds the limit of {MAX_ROWS} rows."
UNSPECIFIED = "Error processing your request."
NODATA = "Your query doesn't match any available data."
TIMEOUT = f"Your query exceeds timeout ({TIMEOUT} seconds)."
MISSING = "Missing parameter: "
BAD_VAL = " Invalid value: "
CHAR = "White space(s) or invalid string. Invalid value for: "
EMPTY = "Empty string. Invalid value for: "
BOOL = "(Valid boolean values are: true/false, yes/no, t/f or 1/0)"
NETWORK = "Invalid network code: "
STATION = "Invalid station code: "
LOCATION = "Invalid location code: "
CHANNEL = "Invalid channel code: "
QUALITY = "Invalid quality code: "
TIME = "Bad date value: "
ROWLIMIT = "The limit parameter must be an integer." + BAD_VAL
MERGE = f"Accepted merge values are: {MERGE}." + BAD_VAL
SHOW = f"Accepted show values: {SHOW}." + BAD_VAL
ORDERBY = f"Accepted orderby values are: {ORDERBY}." + BAD_VAL
MERGEGAPS = (
f"The mergegaps parameter must be a float inside range [0, {MAX_MERGEGAPS}]."
+ BAD_VAL
)
MERGEGAPS_QUERY_ONLY = (
"The mergegaps option is not available with extent mode but with query mode."
)
OUTPUT = f"Accepted output values are: {OUTPUT}." + BAD_VAL
NODATA_CODE = f"Accepted nodata values are: {NODATA_CODE}." + BAD_VAL
NO_WILDCARDS = "Wildcards or lists are not allowed in network parameter if there are wildcards (* or more than one ?) in station parameters."
NO_SELECTION = "Request contains no selections."
class HTTP:
_200_ = "Successful request. "
_204_ = "No data matches the selection. "
_400_ = "Bad request due to improper value. "
_401_ = "Authentication is required. "
_403_ = "Forbidden access. "
_404_ = "No data matches the selection. "
_408_ = "Request exceeds timeout. "
_413_ = "Request too large. "
_414_ = "Request URI too large. "
_500_ = "Internal server error. "
_503_ = "Service unavailable. "
import json
import logging
import re
import time
import zipfile
from tempfile import NamedTemporaryFile
from datetime import datetime, timedelta
import psycopg2
from flask import current_app, make_response
from apps.globals import Error
from apps.globals import MAX_DATA_ROWS
from apps.globals import SCHEMAVERSION
from apps.utils import error_request
from apps.utils import overflow_error
from apps.utils import tictac
QUALITY = 4
SAMPLERATE = 5
START = 6
END = 7
UPDATED = 8
STATUS = 9
COUNT = 10 # timespancount
def get_max_rows(params):
rowlimit = params["limit"]
if params["mergegaps"] is not None or params["extent"]:
rowlimit = MAX_DATA_ROWS
return min(rowlimit, MAX_DATA_ROWS) + 1
def is_like_or_equal(params, key):
""" Builds the condition for the specified key in the "where" clause taking into account lists or wildcards. """
subquery = list()
for param in params[key].split(","):
op = "LIKE" if re.search(r"[*?]", param) else "="
subquery.append(f"{key} {op} '{param}'")
return " OR ".join(subquery)
def sql_request(paramslist):
"""Builds the PostgreSQL request.
(rph5_id is used here to store timespancount later)"""
select = list()
for params in paramslist:
s = f"""SET random_page_cost=1; SELECT network, station, location, channel, quality, samplerate, starttime, endtime, updated_at, UPPER(policy::text), rph5_id FROM rph5 WHERE"""
s = f"""{s} ({is_like_or_equal(params, "network")})"""
if params["station"] != "*":
s = f"""{s} AND ({is_like_or_equal(params, "station")})"""
if params["location"] != "*":
s = f"""{s} AND ({is_like_or_equal(params, "location")})"""
if params["channel"] != "*":
s = f"""{s} AND ({is_like_or_equal(params, "channel")})"""
if params["quality"] != "*":
s = f"""{s} AND ({is_like_or_equal(params, "quality")})"""
start = "-infinity" if params["start"] is None else params["start"]
end = "infinity" if params["end"] is None else params["end"]
s = f"""{s} AND (starttime , endtime) OVERLAPS ('{start}', '{end}')"""
select.append(s.replace("?", "_").replace("*", "%"))
select = " UNION ".join(select)
nrows = get_max_rows(paramslist[0])
return f"""{select} ORDER BY network, station, location, channel, quality, samplerate, starttime, endtime LIMIT {nrows};"""
def collect_data(params):
""" Get the result of the SQL query. """
tic = time.time()
data = list()
logging.debug("Start collecting data...")
with psycopg2.connect(current_app.config["DATABASE_URI"]) as conn:
logging.debug(conn.get_dsn_parameters())
logging.debug(f"Postgres version : {conn.server_version}")
with conn.cursor() as curs:
select = sql_request(params)
logging.debug(select)
curs.execute(select)
logging.debug(curs.statusmessage)
for row in curs.fetchall():
if not params[0]["includerestricted"] and row[STATUS] == "RESTRICTED":
continue
data.append(list(row))
logging.info(f"Get data in {tictac(tic)} seconds.")
return data
def get_header(params):
header = ["Network", "Station", "Location", "Channel"]
if params["format"] == "text":
header[0] = "#" + header[0]
if "quality" not in params["merge"]:
header.append("Quality")
if "samplerate" not in params["merge"]:
header.append("SampleRate")
header.extend(["Earliest", "Latest"])
if params["showlastupdate"]:
header.append("Updated")
if params["extent"]:
header.append("TimeSpans")
header.append("Restriction")
return header
def get_geocsv_header(params):
geocsv_header = [("unitless", "string") for i in range(4)]
if "quality" not in params["merge"]:
geocsv_header.append(("unitless", "string"))
if "samplerate" not in params["merge"]:
geocsv_header.append(("hertz", "float"))
geocsv_header.extend([("ISO_8601", "datetime"), ("ISO_8601", "datetime")])
if params["showlastupdate"]:
geocsv_header.append(("ISO_8601", "datetime"))
if params["extent"]:
geocsv_header.append(("unitless", "integer"))
geocsv_header.append(("unitless", "string"))
text = "#dataset: GeoCSV 2.0\n#delimiter: |\n"
text += "#field_unit: " + "|".join([h[0] for h in geocsv_header]) + "\n"
text += "#field_type: " + "|".join([h[1] for h in geocsv_header]) + "\n"
text += "|".join(get_header(params)) + "\n"
return text
def get_column_widths(data, header=None):
""" Find the maximum width of each column"""
ncols = range(len(data[0]))
colwidths = [max([len(r[i]) for r in data]) for i in ncols]
if header:
colwidths = [max(len(h), cw) for h, cw in zip(header, colwidths)]
return colwidths
def records_to_text(params, data, sep=" "):
text = ""
header = get_header(params)
if params["format"] == "text":
sizes = get_column_widths(data, header)
# pad header and rows according to the maximum column width
header = [val.ljust(sz) for val, sz in zip(header, sizes)]
for row in data:
row[:] = [val.ljust(sz) for val, sz in zip(row, sizes)]
if params["format"] in ["geocsv", "zip"]:
text = get_geocsv_header(params)
elif params["format"] != "request":
text = sep.join(header) + "\n"
data = [f"{sep.join(row)}\n" for row in data]
text += "".join(data)
return text
def records_to_dictlist(params, data):
"""Create json output according to the fdsnws specification schema:
http://www.fdsn.org/webservices/fdsnws-availability-1.0.schema.json"""
dictlist = list()
header = get_header(params)
header = [h.lower() for h in header]
if params["extent"]:
header[header.index("timespans")] = "timespanCount"
for row in data:
dictlist.append(dict(zip(header, row)))
else:
start = -3 if params["showlastupdate"] else -2
prev_row = data[0]
for row in data:
if not dictlist or row[:start] != prev_row[:start]:
dictlist.append(dict(zip(header[:start], row[:start])))
dictlist[-1]["timespans"] = list()
if params["showlastupdate"]:
dictlist[-1]["updated"] = row[-1]
dictlist[-1]["timespans"].append([row[start], row[start + 1]])
prev_row = row
return {
"created": datetime.utcnow().isoformat(timespec="seconds") + "Z",
"version": SCHEMAVERSION,
"datasources": dictlist,
}
def sort_records(params, data):
if params["orderby"] != "nslc_time_quality_samplerate":
if params["extent"] and params["orderby"] == "timespancount":
data.sort(key=lambda x: x[COUNT])
elif params["extent"] and params["orderby"] == "timespancount_desc":
data.sort(key=lambda x: x[COUNT], reverse=True)
elif params["orderby"] == "latestupdate":
data.sort(key=lambda x: x[UPDATED])
elif params["orderby"] == "latestupdate_desc":
data.sort(key=lambda x: x[UPDATED], reverse=True)
else:
data.sort(key=lambda x: (x[QUALITY], x[SAMPLERATE]))
data.sort(key=lambda x: (x[START], x[END]), reverse=True)
data.sort(key=lambda x: x[:QUALITY])
# else:
# data.sort(key=lambda x: x[:UPDATED])
def select_columns(params, data, indexes):
tic = time.time()
indexes = indexes + [START, END]
if params["showlastupdate"]:
indexes = indexes + [UPDATED]
if params["extent"]:
indexes = indexes + [COUNT, STATUS]
if params["format"] == "request":
indexes = [0, 1, 2, 3] + [START, END]
for row in data:
if params["start"] and row[START] < params["start"]:
row[START] = params["start"]
if params["end"] and row[END] > params["end"]:
row[END] = params["end"]
row[START] = row[START].isoformat(timespec="microseconds") + "Z"
row[END] = row[END].isoformat(timespec="microseconds") + "Z"
if params["showlastupdate"] and params["format"] != "request":
row[UPDATED] = row[UPDATED].isoformat(timespec="seconds") + "Z"
if params["format"] != "json":
row[:] = [str(row[i]) for i in indexes]
else:
row[:] = [row[i] for i in indexes]
logging.debug(f"Columns selection in {tictac(tic)} seconds.")
return data
def fusion(params, data, indexes):
""":param data: ordered data list
:k: list or range of indexes of the parameters to be grouped
:tol: timespans which are separated by gaps smaller than or equal to tol are merged together (query)
:returns: produces a list of available time extents (extent) or contiguous time spans (query)"""
tic = time.time()
merge = list()
timespancount = 0
tol = params["mergegaps"] if params["mergegaps"] is not None else 0.0
# The fusion step needs the rows to be sorted.
# data.sort(key=lambda x: x[:UPDATED]) # done by postgres
for row in data:
if merge and [row[i] for i in indexes] == [merge[-1][i] for i in indexes]:
sample_size = 1.0 / float(merge[-1][SAMPLERATE])
tol2 = timedelta(seconds=max([tol, sample_size]))
sametrace = (
row[START] - merge[-1][END] <= tol2
and merge[-1][START] <= row[END] + tol2 # (never occurs if sorted ?)
)
if not sametrace:
timespancount += 1
merge[-1][COUNT] = timespancount
if params["extent"] or sametrace:
if row[UPDATED] > merge[-1][UPDATED]:
merge[-1][UPDATED] = row[UPDATED]
# if row[START] < merge[-1][START]: # never occurs if sorted
# merge[-1][START] = row[START]
if row[END] > merge[-1][END]:
merge[-1][END] = row[END]
else:
merge.append(list(row))
else:
merge.append(list(row))
timespancount = 1
merge[-1][COUNT] = 1
logging.debug(f"Data merged in {tictac(tic)} seconds.")
return merge
def get_indexes(params):
"""Get column indexes to display according to requested params
:param params: parameter dictionary (network, station, ...)
:returns: indexes : list of column indexes"""
indexes = [0, 1, 2, 3, 4, 5]
if "quality" in params["merge"] and "samplerate" in params["merge"]:
indexes = [0, 1, 2, 3]
elif "quality" in params["merge"]:
indexes = [0, 1, 2, 3, 5]
elif "samplerate" in params["merge"]:
indexes = [0, 1, 2, 3, 4]
return indexes
def get_response(params, data):
tic = time.time()
fname = "resifws-ph5-availability"
headers = {"Content-type": "text/plain"}
if params["format"] == "text":
response = make_response(records_to_text(params, data), headers)
elif params["format"] == "request":
response = make_response(records_to_text(params, data), headers)
elif params["format"] == "geocsv":
headers = {"Content-Disposition": f"attachment; filename={fname}.csv"}
response = make_response(records_to_text(params, data, "|"), headers)
response.headers["Content-type"] = "text/csv"
elif params["format"] == "zip":
headers = {"Content-Disposition": f"attachment; filename={fname}.zip"}
tmp_zip = NamedTemporaryFile(delete=True)
with zipfile.ZipFile(tmp_zip.name, "w", zipfile.ZIP_DEFLATED) as zipcsv:
zipcsv.writestr(f"{fname}.csv", records_to_text(params, data, "|"))
response = make_response(tmp_zip.read(), headers)
response.headers["Content-type"] = "application/x-zip-compressed"
elif params["format"] == "json":
headers = {"Content-type": "application/json"}
response = make_response(
json.dumps(records_to_dictlist(params, data), sort_keys=False), headers
)
logging.debug(f"Response built in {tictac(tic)} seconds.")
return response
def get_output(param_dic_list):
"""
Availability output (geocsv, json, request, text, zip)
Parameters:
param_dic_list: list of parameter dictionaries
Returns:
response: response with text, json or csv with ph5 data availability
"""
try:
tic = time.time()
data = None
response = None
params = param_dic_list[0]
data = collect_data(param_dic_list)
if data is None:
return data
if not data:
code = params["nodata"]
return error_request(msg=f"HTTP._{code}_", details=Error.NODATA, code=code)
nrows = len(data)
logging.info(f"Number of collected rows: {nrows}")
if nrows > MAX_DATA_ROWS:
return overflow_error(Error.TOO_MUCH_ROWS)
indexes = get_indexes(params)
if params["mergegaps"] is not None or params["extent"]:
data = fusion(params, data, indexes)
data = data[: params["limit"]]
if params["orderby"] != "nslc_time_quality_samplerate":
sort_records(params, data)
data = select_columns(params, data, indexes)
logging.info(f"Final row number: {len(data)}")
response = get_response(params, data)
logging.debug(f"Processing in {tictac(tic)} seconds.")
return response
except Exception as ex:
logging.exception(str(ex))
finally:
if data:
if response:
nbytes = response.headers.get("Content-Length")
logging.info(f"{nbytes} bytes rendered in {tictac(tic)} seconds.")
class Parameters:
def __init__(self):
self.network = None
self.station = None
self.location = None
self.channel = None
self.starttime = None
self.endtime = None
self.net = "*"
self.sta = "*"
self.loc = "*"
self.cha = "*"
self.start = None
self.end = None
self.quality = "*"
self.merge = ""
self.mergegaps = None
self.orderby = None
self.show = ""
self.limit = None
self.includerestricted = "false"
self.format = "text"
self.nodata = "204"
self.constraints = {
"alias": [
("network", "net"),
("station", "sta"),
("location", "loc"),
("channel", "cha"),
("starttime", "start"),
("endtime", "end"),
],
"booleans": ["includerestricted"],
"floats": [],
"not_none": [],
}
def todict(self):
return self.__dict__
import io
import logging
import queue
import re
from multiprocessing import Process, Queue
from flask import request
from apps.globals import Error
from apps.globals import HTTP
from apps.globals import MAX_DATA_ROWS
from apps.globals import MAX_DAYS
from apps.globals import MAX_MERGEGAPS
from apps.globals import TIMEOUT
from apps.output import get_output
from apps.parameters import Parameters
from apps.utils import check_base_parameters
from apps.utils import check_request
from apps.utils import error_param
from apps.utils import error_request
from apps.utils import is_valid_integer
from apps.utils import is_valid_float
from apps.utils import is_valid_merge
from apps.utils import is_valid_nodata
from apps.utils import is_valid_orderby
from apps.utils import is_valid_output