Commit cd195602 authored by Jonathan Schaeffer's avatar Jonathan Schaeffer
Browse files

Merge branch 'python3'

parents 8117e2ff 4391708d
......@@ -2,7 +2,6 @@
# -*- coding: utf-8 -*-
import argparse
import itertools
import json
import logging
import os
......@@ -13,7 +12,8 @@ import traceback
from proton.utils import BlockingConnection
from proton import Message
import resif_pyinventory
from .resif_pyinventory import resif_pyinventory
SCRIPT_VERSION = 2021.190
......@@ -39,6 +39,11 @@ QUEUE_SEEDTREE_UPDATE_AGAIN = (
QUEUE_WFCATALOG_UPDATE = (
"wfcatalog-update" if RUNMODE == "production" else f"wfcatalog-update-{ RUNMODE }"
)
QUEUE_SEEDPSD_DATA = (
"seedpsd-data" if RUNMODE == "production" else f"seedpsd-data-{ RUNMODE }"
)
SEEDTREE_PATH = os.getenv("SEEDTREE_PATH")
SEEDTREE_PATH = os.getenv("SEEDTREE_PATH")
......@@ -67,21 +72,28 @@ def validated_data_integrated_files(content):
# forward content to Seedtree queue
sender = connection.create_sender(QUEUE_SEEDTREE_UPDATE)
message = Message(durable=True, body=content)
logger.info("forwarding message to queue %s" % QUEUE_SEEDTREE_UPDATE)
logger.info("forwarding message to queue %s", QUEUE_SEEDTREE_UPDATE)
sender.send(message)
# forward content to WFcatalog queue
sender = connection.create_sender(QUEUE_WFCATALOG_UPDATE)
message = Message(durable=True, body=content)
logger.info("forwarding message to queue %s" % QUEUE_WFCATALOG_UPDATE)
logger.info("forwarding message to queue %s", QUEUE_WFCATALOG_UPDATE)
sender.send(message)
# forward content to seedpsd-data queue. On message per line
sender = connection.create_sender(QUEUE_SEEDPSD_DATA)
logger.info("forwarding message to queue %s", QUEUE_SEEDPSD_DATA)
for line in content.split('\n'):
message = Message(durable=True, body=line)
sender.send(message)
def seedtree(content):
"""
Get filenames from incoming message, sort filenames by Seedtree database schema.
Lauch each subsequent Seedtree scans.
Send subsequent messages to applications depending on Seedtree scans.
Send subsequent messages to applications depending on Seedtree scans.
"""
# get a fresh RESIF inventory
logger.info("getting RESIF inventory...")
......@@ -102,10 +114,10 @@ def seedtree(content):
# get full network code for current file
fullnetwork = inv.get_full_network_code(network, station)
if not fullnetwork:
logger.error("full networkcode not found for file %s" % filename)
logger.error("full networkcode not found for file %s", filename)
updateagain += filename + "\n"
continue
logger.debug("new file : %s, full network code : %s " % (filename, fullnetwork))
logger.debug("new file : %s, full network code : %s ", filename, fullnetwork)
fileDict.setdefault(fullnetwork, []).append(filename)
# get seedtree schema name corresponding to this network
schema = inv.get_seedtree_schema(fullnetwork)
......@@ -114,15 +126,13 @@ def seedtree(content):
# send files with problems to an alternate list, for future manual processing
if updateagain:
logger.error(
"sending message to %s : %s" % (QUEUE_SEEDTREE_UPDATE_AGAIN, updateagain)
)
logger.error("sending message to %s : %s", QUEUE_SEEDTREE_UPDATE_AGAIN, updateagain)
sender = connection.create_sender(QUEUE_SEEDTREE_UPDATE_AGAIN)
message = Message(durable=True, body=updateagain)
sender.send(message)
# for each network code in fileDict, launch Seedtree updates
for network in fileDict:
for network in fileDict.items():
# get seedtree schema name
schema = inv.get_seedtree_schema(network)
# build a stringIO containing all filenames
......@@ -133,15 +143,13 @@ def seedtree(content):
mystdin.write(filename + "\n")
mystdin.seek(0)
# lauch seedtree instance
logger.info(
"launching filetree.py --insert %s)" % (schema)
)
logger.info("launching filetree.py --insert %s)", schema)
p = subprocess.Popen([sys.executable, f"{SEEDTREE_PATH}/filetree.py", "--insert", schema], stdin=subprocess.PIPE)
p.communicate(mystdin.read().encode())
logger.info("lauching Seedtree (--sync %s)" % schema)
logger.info("lauching Seedtree (--sync %s)", schema)
p = subprocess.Popen([sys.executable, f"{SEEDTREE_PATH}/seedtree5.py", "--sync", schema])
p.wait()
logger.info("lauching Seedtree (--computeproducts %s)" % schema)
logger.info("lauching Seedtree (--computeproducts %s)", schema)
p = subprocess.Popen([sys.executable, f"{SEEDTREE_PATH}/seedtree5.py", "--computeproducts", schema])
p.wait()
......@@ -194,28 +202,28 @@ if __name__ == "__main__":
logger.info("Connected to %s : %s", AMQP_SERVER, connection)
try:
# wait for message on queue
logger.info(
"waiting for incoming message on queue %s ~~~~~~~~~~~~ " % queue
)
message = receiver.receive(timeout=None)
logger.info("message received : %s" % message)
# launch subsequent action
if isinstance(message.body, bytes):
m = message.body.decode('utf-8')
else:
m = message.body
if args.validated_data_integrated_files:
validated_data_integrated_files(m)
if args.seedtree:
seedtree(m)
if args.wfcatalog:
wfcatalog(m)
receiver.accept()
logger.info("message acknowledged")
# wait for message on queue
logger.info(
"waiting for incoming message on queue %s ~~~~~~~~~~~~ ", queue
)
message = receiver.receive(timeout=None)
logger.info("message received : %s", message)
# launch subsequent action
if isinstance(message.body, bytes):
m = message.body.decode('utf-8')
else:
m = message.body
if args.validated_data_integrated_files:
validated_data_integrated_files(m)
if args.seedtree:
seedtree(m)
if args.wfcatalog:
wfcatalog(m)
receiver.accept()
logger.info("message acknowledged")
except Exception as err:
logger.critical(traceback.format_exc())
logger.critical(str(err))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment