Commit 736a312c authored by Jonathan Schaeffer's avatar Jonathan Schaeffer
Browse files

Merge branch 'python3'

parents fc895f49 943f8ce7
[submodule "wfcatalog"]
path = wfcatalog
url = https://github.com/EIDA/wfcatalog.git
branch = sdsbynet
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
python-qpid-proton = "*"
python-dateutil = "*"
[dev-packages]
[requires]
python_version = "3.9"
{
"_meta": {
"hash": {
"sha256": "42404ed4e944e95355661cbe790643c199228aa9a2f3925035bd3cb792c99942"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.9"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"python-dateutil": {
"hashes": [
"sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c",
"sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"
],
"index": "pypi",
"version": "==2.8.1"
},
"python-qpid-proton": {
"hashes": [
"sha256:64a983cc51c78dd6c7c206eb610f52da8edc5e1c5cb6c4e9cdc16ee62a9e1b5e"
],
"index": "pypi",
"version": "==0.34.0"
},
"six": {
"hashes": [
"sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926",
"sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.16.0"
}
},
"develop": {}
}
......@@ -10,10 +10,10 @@ Script original par Pierre Volcke.
# Installation
Ce script nécessite un environnement préalable : le module python `qpid-python` :
Ce script nécessite un environnement préalable : le module python `python-qpid-proton` :
``` shellsession
pip install qpid-python
pip install python-qpid-proton
```
Puis le module python `resif_pyinventory` à rendre disponible dans le PYTHONPATH et récupérable sur https://gricad-gitlab.univ-grenoble-alpes.fr/OSUG/RESIF/resif_pyinventory/
#!/usr/bin/env python2.7
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
......@@ -6,192 +6,233 @@ import itertools
import json
import logging
import os
import StringIO
import io
import subprocess
import sys
import traceback
import qpid.messaging
from proton.utils import BlockingConnection
from proton import Message
import resif_pyinventory
# https://pypi.python.org/pypi/ConcurrentLogHandler/
# use concurrency-enabled logging because Python logging library does
# not support writing logs concurrently in a unique file.
try:
from cloghandler import ConcurrentRotatingFileHandler as RFHandler
except ImportError:
sys.stderr.write("ConcurrentLogHandler package not installed. Using builtin log handler")
from logging.handlers import RotatingFileHandler as RFHandler
SCRIPT_VERSION = 2021.190
# script behavior depends on RUNMODE variable
RUNMODE = os.environ["RUNMODE"]
# AMQP broker
AMQP_SERVER = 'amqp-geodata.ujf-grenoble.fr:5672'
AMQP_SERVER = "amqp-geodata.ujf-grenoble.fr:5672"
# queue names
QUEUE_VALIDATED_DATA_INTEGRATED_FILES = 'validated-data-integrated-files-test' if RUNMODE=="test" else 'validated-data-integrated-files'
QUEUE_SEEDTREE_UPDATE = 'seedtree-update-test' if RUNMODE=="test" else 'seedtree-update'
QUEUE_SEEDTREE_UPDATE_AGAIN = 'seedtree-update-again-test' if RUNMODE=="test" else 'seedtree-update-again'
QUEUE_WFCATALOG_UPDATE = 'wfcatalog-update-test' if RUNMODE=="test" else 'wfcatalog-update'
QUEUE_VALIDATED_DATA_INTEGRATED_FILES = (
"validated-data-integrated-files"
if RUNMODE == "production"
else f"validated-data-integrated-files-{ RUNMODE }"
)
QUEUE_SEEDTREE_UPDATE = (
"seedtree-update" if RUNMODE == "production" else f"seedtree-update-{ RUNMODE }"
)
QUEUE_SEEDTREE_UPDATE_AGAIN = (
"seedtree-update-again" if RUNMODE == "production" else f"seedtree-update-again-{ RUNMODE }"
)
QUEUE_WFCATALOG_UPDATE = (
"wfcatalog-update" if RUNMODE == "production" else f"wfcatalog-update-{ RUNMODE }"
)
# connect to AMQP server
connection = None
session = None
# log to file
LOG_FILE = os.path.expanduser('~') + '/logs/amqpworker.log'
logger = logging.getLogger(__name__)
logger.setLevel ( level = logging.INFO if RUNMODE == "production" else logging.DEBUG )
formatter = logging.Formatter ( '[%(asctime)s %(process)d %(module)s@%(lineno)d %(levelname)s] %(message)s' )
handler = RFHandler ( filename = LOG_FILE )
logger = logging.getLogger('amqpWorker')
logger.setLevel(level=logging.INFO if RUNMODE == "production" else logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def grouper(iterable, n, fillvalue=None):
def grouper(seq, size):
"""
Collect data into fixed-length chunks or blocks
https://docs.python.org/2/library/itertools.html
Split a sequence in a fixed-size list of sequences
"""
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
args = [iter(iterable)] * n
return itertools.izip_longest(fillvalue=fillvalue, *args)
def validated_data_integrated_files ( content ):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def validated_data_integrated_files(content):
""" incoming new validated miniseed files """
# forward content to Seedtree queue
sender = session.sender ( QUEUE_SEEDTREE_UPDATE )
message = qpid.messaging.Message ( content )
message.durable = True
logger.info ( "forwarding message to queue %s" % QUEUE_SEEDTREE_UPDATE )
sender.send ( message )
# forward content to WFcatalog queue
sender = session.sender ( QUEUE_WFCATALOG_UPDATE )
message = qpid.messaging.Message ( content )
message.durable = True
logger.info ( "forwarding message to queue %s" % QUEUE_WFCATALOG_UPDATE )
sender.send ( message )
def seedtree ( content ):
# forward content to Seedtree queue
sender = connection.create_sender(QUEUE_SEEDTREE_UPDATE)
message = Message(durable=True, body=content)
logger.info("forwarding message to queue %s" % QUEUE_SEEDTREE_UPDATE)
sender.send(message)
# forward content to WFcatalog queue
sender = connection.create_sender(QUEUE_WFCATALOG_UPDATE)
message = Message(durable=True, body=content)
logger.info("forwarding message to queue %s" % QUEUE_WFCATALOG_UPDATE)
sender.send(message)
def seedtree(content):
"""
Get filenames from incoming message, sort filenames by Seedtree database schema.
Lauch each subsequent Seedtree scans.
Send subsequent messages to applications depending on Seedtree scans.
"""
# get a fresh RESIF inventory
logger.info ("getting RESIF inventory..." )
logger.info("getting RESIF inventory...")
inv = resif_pyinventory.ResifInventory()
# fileDict is a logical group with filenames sorted by full network code
# fileDict is a logical group with filenames sorted by full network code
fileDict = {}
# yearDict is a logical group with stations sorted by {full network + year + seedtree schema name}
# yearDict is a logical group with stations sorted by {full network + year + seedtree schema name}
yearDict = {}
# list of filenames update problems
updateagain=""
updateagain = ""
for filename in content.splitlines():
tokens = filename.split ( '/' )[-1].split('.')
tokens = filename.split("/")[-1].split(".")
network = tokens[0]
station = tokens[1]
year = tokens[5]
# get full network code for current file
fullnetwork = inv.get_full_network_code ( network, station )
if not fullnetwork:
logger.error ("full networkcode not found for file %s" % filename )
updateagain += filename + '\n'
fullnetwork = inv.get_full_network_code(network, station)
if not fullnetwork:
logger.error("full networkcode not found for file %s" % filename)
updateagain += filename + "\n"
continue
logger.debug ( "new file : %s, full network code : %s " % (filename, fullnetwork) )
fileDict.setdefault(fullnetwork,[]).append(filename)
logger.debug("new file : %s, full network code : %s " % (filename, fullnetwork))
fileDict.setdefault(fullnetwork, []).append(filename)
# get seedtree schema name corresponding to this network
schema = inv.get_seedtree_schema ( fullnetwork )
# store stations sorted with key {fullnetwork.year.schema}
yearDict.setdefault( '.'.join([fullnetwork,year,schema]),set()).add(station)
schema = inv.get_seedtree_schema(fullnetwork)
# store stations sorted with key {fullnetwork.year.schema}
yearDict.setdefault(".".join([fullnetwork, year, schema]), set()).add(station)
# send files with problems to an alternate list, for future manual processing
if updateagain:
logger.error("sending message to %s : %s" % (QUEUE_SEEDTREE_UPDATE_AGAIN, updateagain))
sender = session.sender ( QUEUE_SEEDTREE_UPDATE_AGAIN )
message = qpid.messaging.Message ( updateagain )
message.durable = True
sender.send ( message )
logger.error(
"sending message to %s : %s" % (QUEUE_SEEDTREE_UPDATE_AGAIN, updateagain)
)
sender = connection.create_sender(QUEUE_SEEDTREE_UPDATE_AGAIN)
message = Message(durable=True, body=updateagain)
sender.send(message)
# for each network code in fileDict, launch Seedtree updates
for network in fileDict:
# get seedtree schema name
schema = inv.get_seedtree_schema ( network )
# get seedtree schema name
schema = inv.get_seedtree_schema(network)
# build a stringIO containing all filenames
mystdin = StringIO.StringIO()
mystdin = io.StringIO()
nbfiles = 0
for filename in fileDict[network]:
nbfiles = nbfiles + 1
mystdin.write ( filename + '\n' )
nbfiles = nbfiles + 1
mystdin.write(filename + "\n")
mystdin.seek(0)
# lauch seedtree instance
logger.info("lauching Filetree (--insert %d files in schema %s)" % (nbfiles, schema))
p = subprocess.Popen ( ['filetree.py', '--insert', schema], stdin=subprocess.PIPE)
p.communicate ( mystdin.read() )
logger.info(
"lauching Filetree (--insert %d files in schema %s)" % (nbfiles, schema)
)
p = subprocess.Popen(["filetree.py", "--insert", schema], stdin=subprocess.PIPE)
p.communicate(mystdin.read())
logger.info("lauching Seedtree (--sync %s)" % schema)
p = subprocess.Popen ( ['seedtree5.py', '--sync', schema])
p = subprocess.Popen(["seedtree5.py", "--sync", schema])
p.wait()
logger.info("lauching Seedtree (--computeproducts %s)" % schema)
p = subprocess.Popen ( ['seedtree5.py', '--computeproducts', schema])
p = subprocess.Popen(["seedtree5.py", "--computeproducts", schema])
p.wait()
def wfcatalog ( content ):
def wfcatalog(content):
# launch wfcatalog updates, 100 files for each run
for chunk in grouper ( content.splitlines(), 50, None ):
logger.info ( "launching WFCatalog update on : %s" % str(chunk) )
files = json.dumps( chunk)
p = subprocess.Popen ( ['/usr/bin/python2.7', '/home/sysop/wfcatalog/collector/WFCatalogCollector.py', '--update','--force','--logfile','/home/sysop/wfcatalog_amqp.log','--list', files] )
p.wait()
for chunk in grouper(content.splitlines(), 50):
logger.info("launching WFCatalog update on : %s" % str(chunk))
files = json.dumps(chunk)
out = subprocess.Popen(
[
"python3",
"./wfcatalog/collector/WFCatalogCollector.py",
"--update",
"--force",
"--stdout",
"--list",
files,
], stdout=subprocess.PIPE
).communicate()[0]
print(out.decode('utf-8'))
if __name__ == "__main__":
# build command line parser
parser = argparse.ArgumentParser(description='Handle AMQP messages circulating across RESIF datacentre applications, launch subsequent applications accordingly.')
group1 = parser.add_mutually_exclusive_group ( required = True )
group1.add_argument("--validated-data-integrated-files", help='Listen for new validated data files', action='store_true')
group1.add_argument("--seedtree", help='Launch Seedtree application whenever needed', action='store_true')
group1.add_argument("--wfcatalog", help='Launch wfcatalog collector whenever needed', action='store_true')
parser.add_argument ("--version", action='version', version = 'v%.3f' % SCRIPT_VERSION )
logger.info("Starting")
# build command line parser
parser = argparse.ArgumentParser(
description="Handle AMQP messages circulating across RESIF datacentre applications, launch subsequent applications accordingly."
)
group1 = parser.add_mutually_exclusive_group(required=True)
group1.add_argument(
"--validated-data-integrated-files",
help="Listen for new validated data files",
action="store_true",
)
group1.add_argument(
"--seedtree",
help="Launch Seedtree application whenever needed",
action="store_true",
)
group1.add_argument(
"--wfcatalog",
help="Launch wfcatalog collector whenever needed",
action="store_true",
)
parser.add_argument("--version", action="version", version="v%.3f" % SCRIPT_VERSION)
args = parser.parse_args()
# set queue name
if args.validated_data_integrated_files: queue = QUEUE_VALIDATED_DATA_INTEGRATED_FILES
if args.seedtree: queue = QUEUE_SEEDTREE_UPDATE
if args.wfcatalog: queue = QUEUE_WFCATALOG_UPDATE
if args.validated_data_integrated_files:
queue = QUEUE_VALIDATED_DATA_INTEGRATED_FILES
if args.seedtree:
queue = QUEUE_SEEDTREE_UPDATE
if args.wfcatalog:
queue = QUEUE_WFCATALOG_UPDATE
# connect to AMQP broker
connection = qpid.messaging.Connection ( url = AMQP_SERVER, reconnect = True )
connection.open()
session = connection.session()
logger.info("Trying to connect to %s", AMQP_SERVER)
connection = BlockingConnection(AMQP_SERVER)
logger.info("Connected to %s : %s", AMQP_SERVER, connection)
stoploop = False
try:
while not stoploop:
# wait for message on queue
logger.info ( "waiting for incoming message on queue %s ~~~~~~~~~~~~ " % queue )
receiver = session.receiver ( queue )
message = receiver.fetch ( timeout = None )
logger.info(
"waiting for incoming message on queue %s ~~~~~~~~~~~~ " % queue
)
receiver = connection.create_receiver(queue)
message = receiver.receive(timeout=None)
logger.info("message received : %s" % message)
# launch subsequent action
if args.validated_data_integrated_files: validated_data_integrated_files ( message.content )
if args.seedtree: seedtree ( message.content )
if args.wfcatalog: wfcatalog ( message.content )
# acknowledge message
session.acknowledge()
logger.info ( "message acknowledged" )
# if not production mode, quit
if RUNMODE!="production": stoploop = True
except Exception, err:
logger.critical ( traceback.format_exc() )
logger.critical ( str(err) )
if args.validated_data_integrated_files:
validated_data_integrated_files(message.body)
if args.seedtree:
seedtree(message.body)
if args.wfcatalog:
wfcatalog(message.body)
receiver.accept()
logger.info("message acknowledged")
# if not production mode, quit
if RUNMODE != "production":
stoploop = True
except Exception as err:
logger.critical(traceback.format_exc())
logger.critical(str(err))
raise
finally:
connection.close()
logger.critical ( "amqp worker quitting" )
logger.critical("amqp worker quitting")
sys.exit(1)
#!/usr/bin/env python2.7
#!/usr/bin/env python3
"""
A simple script to send messages to our AMQP server from a shell script.
Needs on Qpid Python library.
......@@ -15,35 +15,35 @@ echo "hello" | amqp_send.py <options>
"""
BROKER = "amqp-geodata.ujf-grenoble.fr"
BROKER = "amqp-geodata.ujf-grenoble.fr"
import argparse
import sys
from qpid.messaging import *
from proton.utils import BlockingConnection
from proton import Message
parser = argparse.ArgumentParser(description='Send messages to RESIF datacentre AMQP broker. Message content is read from stdin.')
parser.add_argument('address', help='address')
parser.add_argument('--version', action='version', version='%(prog)s 0.1')
parser = argparse.ArgumentParser(
description="Send messages to RESIF datacentre AMQP broker. Message content is read from stdin."
)
parser.add_argument("address", help="address")
parser.add_argument("--version", action="version", version="%(prog)s 1.0")
args = parser.parse_args()
address = args.address
connection = Connection(BROKER)
connection = BlockingConnection(BROKER)
try:
connection.open()
session = connection.session()
sender = session.sender(address)
m = Message ( sys.stdin.read(), durable=True )
sender.send( m );
session.acknowledge()
sys.stderr.write( "Message sent to amqp://%s/%s\n" % (BROKER,address) )
except Exception as e:
sys.stderr.write( "[Error] " + str(e)+'\n' )
sender = connection.create_sender(address)
message = Message(durable=True, body=sys.stdin.read())
sender.send(message)
sys.stderr.write("Message sent to amqp://%s/%s\n" % (BROKER, address))
except Exception as e:
sys.stderr.write("[Error] " + str(e) + "\n")
connection.close()
sys.exit(1)
connection.close()
connection.close()
#!/usr/bin/env python2.7
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
......@@ -40,17 +40,17 @@ if __name__ == "__main__":
# for each network
for n in myinventory.get_inventory():
print "\n# Network " + n.altcode
print("\n# Network " + n.altcode)
# for each year in network
# (skip end dates set in the future)
startyear = n.startdate.year
endyear = ( n.enddate.year if n.enddate.year<=currentyear else currentyear )
# (skip end dates set in the future)
startyear = n.startdate.year
endyear = n.enddate.year if n.enddate.year <= currentyear else currentyear
# for each stations
stations = n.stations
stations = " ".join(x.code for x in n.stations)
# print AMQP message
for y in range(startyear,endyear+1):
message = ';'.join( [ n.altcode, n.seedtreeschema, str(y), ' '.join(x.code for x in stations) ] )
print "spout -b amqp-geodata.ujf-grenoble.fr portal-products-update", "\"" + message + "\""
for y in range(startyear, endyear + 1):
message = ";".join([n.altcode, n.seedtreeschema, str(y), stations])
print(
"spout -b amqp-geodata.ujf-grenoble.fr portal-products-update",
'"' + message + '"',
)
#
# These requirements were autogenerated by pipenv
# To regenerate from the project's Pipfile, run:
#
# pipenv lock --requirements
#
-i https://pypi.org/simple
python-dateutil==2.8.1
python-qpid-proton==0.34.0
six==1.16.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
Tiny class for reading inventory loaded from RESIF FDSN Station webservice.
Provides special service like reading RESIF extended XML attributes.
This library is intended for RESIF internal use, for advanced Inventory features and more "portable" code you should use Obspy library.
### Example usage
Inventory is downloaded from the RESIF webservice upon object creation :
```
from resif_pyinventory import ResifInventory
myinventory = ResifInventory()
```
Get whole inventory:
```
allnetworks = myinventory.get_inventory()
```
Prints first network found in the inventory:
```
> print allnetworks[0]
Network(code='1A', startdate=datetime.datetime(2009, 1, 1, 0, 0),
enddate=datetime.datetime(2012, 12, 31, 23, 59, 59, 999999),
altcode='1A2009',
seedtreeschema='_1a2009',
stations= [
Station(code='CORRE',
startdate=datetime.datetime(2009, 11, 6, 0, 0),
enddate=datetime.datetime(2012, 12, 31, 23, 59, 59)),
Station(code='PIDGE',
startdate=datetime.datetime(2009, 11, 8, 0, 0),
enddate=datetime.datetime(2012, 12, 31, 23, 59, 59)),
Station(code='PINGU',
startdate=datetime.datetime(2009, 11, 9, 0, 0),
enddate=datetime.datetime(2012, 12, 31, 23, 59, 59)),
Station(code='PORMA',
startdate=datetime.datetime(2009, 11, 4, 0, 0),
enddate=datetime.datetime(2012, 12, 31, 23, 59, 59))],
)
```
Loop on all networks, show number of stations per network :
```
for n in allnetworks:
print "network", n.altcode, "has", len(n.stations), "stations";
```
In the examples above, the *NamedTuple* (see Python documentation) resulting from *get_inventory()* has various fields correponding to Network description. Most of them are self-explanatory. *Altcode* is the [full FDSN Network code](http://www.fdsn.org/networks/) ; *seedtreeschema* is the corresponding [Seedtree](https://github.com/resif/seedtree5) database schema used internally for storing Seedtree scans.
Seek for network named RD:
```
rd = myinventory.get_inventory('RD')
```
Use the reload() method if you need to refresh the inventory :
```
myinv.reload()
```
Look up full network code based on 2-character SEED network and station code.
Returns None if not found. Raises an exception if more than one network is found :