Commit f63e3616 authored by Jonathan Schaeffer's avatar Jonathan Schaeffer
Browse files

Version fonctionnelle en staging

parent 3d1d9e7e
......@@ -15,13 +15,14 @@ from proton.utils import BlockingConnection
from proton import Message
import resif_pyinventory
SCRIPT_VERSION = 2021.190
# script behavior depends on RUNMODE variable
RUNMODE = os.environ["RUNMODE"]
# AMQP broker
AMQP_SERVER = "amqp-geodata.ujf-grenoble.fr:5672"
AMQP_SERVER = os.environ.get("AMQP_SERVER", "amqp-geodata.ujf-grenoble.fr:5672")
# queue names
QUEUE_VALIDATED_DATA_INTEGRATED_FILES = (
......@@ -39,6 +40,8 @@ QUEUE_WFCATALOG_UPDATE = (
"wfcatalog-update" if RUNMODE == "production" else f"wfcatalog-update-{ RUNMODE }"
)
SEEDTREE_PATH = os.getenv("SEEDTREE_PATH")
# connect to AMQP server
connection = None
session = None
......@@ -131,41 +134,22 @@ def seedtree(content):
mystdin.seek(0)
# lauch seedtree instance
logger.info(
"lauching Filetree (--insert %d files in schema %s)" % (nbfiles, schema)
"launching filetree.py --insert %s)" % (schema)
)
p = subprocess.Popen(["filetree.py", "--insert", schema], stdin=subprocess.PIPE)
p.communicate(mystdin.read())
p = subprocess.Popen([sys.executable, f"{SEEDTREE_PATH}/filetree.py", "--insert", schema], stdin=subprocess.PIPE)
p.communicate(mystdin.read().encode())
logger.info("lauching Seedtree (--sync %s)" % schema)
p = subprocess.Popen(["seedtree5.py", "--sync", schema])
p = subprocess.Popen([sys.executable, f"{SEEDTREE_PATH}/seedtree5.py", "--sync", schema])
p.wait()
logger.info("lauching Seedtree (--computeproducts %s)" % schema)
p = subprocess.Popen(["seedtree5.py", "--computeproducts", schema])
p = subprocess.Popen([sys.executable, f"{SEEDTREE_PATH}/seedtree5.py", "--computeproducts", schema])
p.wait()
def wfcatalog(content):
# launch wfcatalog updates, 100 files for each run
for chunk in grouper(content.splitlines(), 50):
logger.info("launching WFCatalog update on : %s" % str(chunk))
try:
# Try to convert a list of bytes to list of strings
chunk = [m.decode('utf-8') for m in chunk]
except AttributeError as e:
# chunk is already a list of strings
pass
files = json.dumps(chunk)
out = subprocess.Popen(
[
"python3",
"./wfcatalog/collector/WFCatalogCollector.py",
"--update",
"--force",
"--stdout",
"--list",
files,
], stdout=subprocess.PIPE
).communicate()[0]
print(out.decode('utf-8'))
wfcollector = WFCatalogCollector(to_stdout=True)
files = json.dumps(content.splitlines())
wfcollector.process({'update': True, 'force': True, 'csegs': True, 'flags': True, 'list': files})
if __name__ == "__main__":
......@@ -200,45 +184,43 @@ if __name__ == "__main__":
if args.seedtree:
queue = QUEUE_SEEDTREE_UPDATE
if args.wfcatalog:
from wfcatalog.collector.WFCatalogCollector import WFCatalogCollector
queue = QUEUE_WFCATALOG_UPDATE
# connect to AMQP broker
logger.info("Trying to connect to %s", AMQP_SERVER)
connection = BlockingConnection(AMQP_SERVER)
receiver = connection.create_receiver(queue)
logger.info("Connected to %s : %s", AMQP_SERVER, connection)
stoploop = False
try:
while not stoploop:
# wait for message on queue
logger.info(
"waiting for incoming message on queue %s ~~~~~~~~~~~~ " % queue
)
receiver = connection.create_receiver(queue)
message = receiver.receive(timeout=None)
logger.info("message received : %s" % message)
# launch subsequent action
if isinstance(message.body, bytes):
m = message.body.decode('utf-8')
else:
m = message.body
if args.validated_data_integrated_files:
validated_data_integrated_files(message.body)
validated_data_integrated_files(m)
if args.seedtree:
seedtree(message.body)
seedtree(m)
if args.wfcatalog:
wfcatalog(message.body)
wfcatalog(m)
receiver.accept()
logger.info("message acknowledged")
# if not production mode, quit
if RUNMODE != "production":
stoploop = True
except Exception as err:
logger.critical(traceback.format_exc())
logger.critical(str(err))
raise
finally:
connection.close()
connection.close()
logger.critical("amqp worker quitting")
sys.exit(1)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment