Commit 4986bfe2 authored by Jonathan Schaeffer's avatar Jonathan Schaeffer
Browse files

Utilisation plus propre de Proton

parent c7a7d59b
......@@ -11,7 +11,9 @@ import sys
import traceback
from proton.handlers import MessagingHandler
from proton import Message
from proton.reactor import Container
from proton.utils import BlockingConnection
from resif_pyinventory import resif_pyinventory
......@@ -48,19 +50,23 @@ SEEDTREE_PATH = os.getenv("SEEDTREE_PATH")
class Recv(MessagingHandler):
def __init__(self, url):
def __init__(self, url, mode):
super(Recv, self).__init__()
self.url = url
self.mode = mode
def on_start(self, event):
logger.info("Listening on %s", self.url)
event.container.create_receiver(self.url)
def on_message(self, event):
if event.message.id and event.message.id < self.received:
# ignore duplicate message
return
logging.info("Receiving %s",event.message.body)
logger.info("Receiving %s",event.message.body)
if self.mode == 'validated_data_integrated_files':
validated_data_integrated_files(event.message.body)
elif self.mode == 'wfcatalog':
wfcatalog(event.message.body)
elif self.mode == 'seedtree':
seedtree(event.message.body)
# connect to AMQP server
......@@ -85,6 +91,7 @@ def grouper(seq, size):
def validated_data_integrated_files(content):
""" incoming new validated miniseed files """
connection = BlockingConnection(AMQP_SERVER)
# forward content to Seedtree queue
sender = connection.create_sender(QUEUE_SEEDTREE_UPDATE)
message = Message(durable=True, body=content)
......@@ -211,44 +218,15 @@ if __name__ == "__main__":
from wfcatalog.collector.WFCatalogCollector import WFCatalogCollector
queue = QUEUE_WFCATALOG_UPDATE
mode = ""
if args.validated_data_integrated_files:
mode = "validated_data_integrated_files"
if args.seedtree:
mode = "seedtree"
if args.wfcatalog:
mode = "wfcatalog"
try:
Container(Recv(f"{AMQP_SERVER}/{QUEUE_VALIDATED_DATA_INTEGRATED_FILES}")).run()
except KeyboardInterrupt: pass
# connect to AMQP broker
logger.info("Trying to connect to %s", AMQP_SERVER)
connection = BlockingConnection(AMQP_SERVER)
receiver = connection.create_receiver(queue)
logger.info("Connected to %s : %s", AMQP_SERVER, connection)
try:
# wait for message on queue
logger.info(
"waiting for incoming message on queue %s ~~~~~~~~~~~~ ", queue
)
message = receiver.receive(timeout=None)
logger.info("message received : %s", message)
# launch subsequent action
if isinstance(message.body, bytes):
m = message.body.decode('utf-8')
else:
m = message.body
if args.validated_data_integrated_files:
validated_data_integrated_files(m)
if args.seedtree:
seedtree(m)
if args.wfcatalog:
wfcatalog(m)
receiver.accept()
logger.info("message acknowledged")
except Exception as err:
logger.critical(traceback.format_exc())
logger.critical(str(err))
raise
receiver.close()
connection.close()
logger.info("Trying to connect to %s/%s", AMQP_SERVER, queue)
Container(Recv(f"{AMQP_SERVER}/{queue}", mode)).run()
logger.info("amqp worker quitting")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment