Commit c48ae4a2 authored by Jonathan Schaeffer's avatar Jonathan Schaeffer
Browse files

Gestion d'une nouvelle queue pour les métadonnées

parent 4986bfe2
......@@ -17,7 +17,16 @@ from proton.utils import BlockingConnection
from resif_pyinventory import resif_pyinventory
SCRIPT_VERSION = 2021.190
SCRIPT_VERSION = "2022.060"
logger = logging.getLogger('amqpWorker')
logger.setLevel(level=logging.INFO if RUNMODE == "production" else logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# script behavior depends on RUNMODE variable
RUNMODE = os.environ["RUNMODE"]
......@@ -31,6 +40,9 @@ QUEUE_VALIDATED_DATA_INTEGRATED_FILES = (
if RUNMODE == "production"
else f"validated-data-integrated-files-{ RUNMODE }"
)
QUEUE_VALIDATED_METADATA = (
"validated-metadata" if RUNMODE == "production" else f"validated_metadata-{ RUNMODE }"
)
QUEUE_SEEDTREE_UPDATE = (
"seedtree-update" if RUNMODE == "production" else f"seedtree-update-{ RUNMODE }"
)
......@@ -43,6 +55,9 @@ QUEUE_WFCATALOG_UPDATE = (
QUEUE_SEEDPSD_DATA = (
"seedpsd-data" if RUNMODE == "production" else f"seedpsd-data-{ RUNMODE }"
)
QUEUE_SEEDPSD_METADATA = (
"seedpsd-metadata" if RUNMODE == "production" else f"seedpsd-data-{ RUNMODE }"
)
SEEDTREE_PATH = os.getenv("SEEDTREE_PATH")
......@@ -50,6 +65,9 @@ SEEDTREE_PATH = os.getenv("SEEDTREE_PATH")
class Recv(MessagingHandler):
"""
Receiver, connecting to a queue, waiting for new messages.
"""
def __init__(self, url, mode):
super(Recv, self).__init__()
self.url = url
......@@ -63,32 +81,14 @@ class Recv(MessagingHandler):
logger.info("Receiving %s",event.message.body)
if self.mode == 'validated_data_integrated_files':
validated_data_integrated_files(event.message.body)
elif self.mode == 'validated_metadata':
validated_metadata(event.message.body)
elif self.mode == 'wfcatalog':
wfcatalog(event.message.body)
elif self.mode == 'seedtree':
seedtree(event.message.body)
# connect to AMQP server
connection = None
session = None
logger = logging.getLogger('amqpWorker')
logger.setLevel(level=logging.INFO if RUNMODE == "production" else logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def grouper(seq, size):
"""
Split a sequence in a fixed-size list of sequences
"""
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def validated_data_integrated_files(content):
""" incoming new validated miniseed files """
connection = BlockingConnection(AMQP_SERVER)
......@@ -111,6 +111,18 @@ def validated_data_integrated_files(content):
message = Message(durable=True, body=line)
sender.send(message)
def validated_metadata(content):
"""
Les messages concernent de la nouvelle métadonnée et doivent être transmis aux queues de post-traitement
de métadonnée
"""
connection = BlockingConnection(AMQP_SERVER)
# forward content to Seedtree queue
sender = connection.create_sender(QUEUE_SEEDPSD_METADATA)
message = Message(durable=True, body=content)
logger.info("forwarding message to queue %s", QUEUE_SEEDPSD_METADATA)
sender.send(message)
def seedtree(content):
"""
......@@ -177,13 +189,14 @@ def seedtree(content):
p.wait()
def wfcatalog(content):
# launch wfcatalog updates, 100 files for each run
"""
Launch wfcatalog updates
"""
wfcollector = WFCatalogCollector(to_stdout=True)
files = json.dumps(content.splitlines())
wfcollector.process({'update': True, 'force': True, 'csegs': True, 'flags': True, 'list': files})
if __name__ == "__main__":
logger.info("Starting")
# build command line parser
......@@ -196,6 +209,11 @@ if __name__ == "__main__":
help="Listen for new validated data files",
action="store_true",
)
group1.add_argument(
"--validated-metadata",
help="Listen for new validated metadata",
action="store_true",
)
group1.add_argument(
"--seedtree",
help="Launch Seedtree application whenever needed",
......@@ -209,24 +227,22 @@ if __name__ == "__main__":
parser.add_argument("--version", action="version", version="v%.3f" % SCRIPT_VERSION)
args = parser.parse_args()
# set queue name
# set queue name and mode
mode = ""
if args.validated_data_integrated_files:
queue = QUEUE_VALIDATED_DATA_INTEGRATED_FILES
mode = "validated_data_integrated_files"
if args.validated_metadata:
queue = QUEUE_VALIDATED_METADATA
mode = "validated_metadata"
if args.seedtree:
queue = QUEUE_SEEDTREE_UPDATE
mode = "seedtree"
if args.wfcatalog:
from wfcatalog.collector.WFCatalogCollector import WFCatalogCollector
queue = QUEUE_WFCATALOG_UPDATE
mode = ""
if args.validated_data_integrated_files:
mode = "validated_data_integrated_files"
if args.seedtree:
mode = "seedtree"
if args.wfcatalog:
mode = "wfcatalog"
logger.info("Trying to connect to %s/%s", AMQP_SERVER, queue)
Container(Recv(f"{AMQP_SERVER}/{queue}", mode)).run()
logger.info("amqp worker quitting")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment