Skip to content
Snippets Groups Projects

Draft: Resolve "Validated data : listen to integration queue in order to process validated data"

+ 34
2
@@ -3,9 +3,11 @@ from pathlib import Path
from typing import Any
import pendulum
from celery import Celery, Task, chain, group
from celery import Celery, Task, bootsteps, chain, group
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from kombu import Consumer, Exchange, Message, Queue
from kombu.transport.virtual import Channel as KombuChannel
from obspy import UTCDateTime, read
from obspy.clients.fdsn.client import (
FDSNBadRequestException,
@@ -63,6 +65,31 @@ engine = create_engine(str(settings.PG_DSN))
dbsession = scoped_session(sessionmaker(bind=engine))
post_integration_queue = Queue(
"waveqc-data",
exchange=Exchange("postintegration", "direct", durable=True),
routing_key="seed-data",
auto_delete=True,
durable=False,
)
class ValidatedDataIntegrationStep(bootsteps.ConsumerStep):
def get_consumers(self, channel: KombuChannel) -> list[Consumer]:
return [
Consumer(
channel,
queues=[post_integration_queue],
callbacks=[self.handle_message],
accept=["json"],
)
]
def handle_message(self, body: str, message: Message) -> None:
manage_validated_data.delay(body)
message.ack()
class DatabaseTask(Task):
def after_return(self, *_: list[Any]) -> None:
dbsession.commit()
@@ -79,7 +106,7 @@ app = Celery(
result_backend=str(settings.REDIS_DSN),
task_cls="waveqc.tasks.DatabaseTask",
)
app.steps["consumer"].add(ValidatedDataIntegrationStep)
app.conf.worker_prefetch_multiplier = settings.CELERY_PREFETCH
app.conf.worker_max_tasks_per_child = settings.CELERY_MAX_TASKS_PER_CHILD
app.conf.beat_schedule_filename = str(settings.CELERY_BEAT_FILENAME)
@@ -107,6 +134,11 @@ app.conf.beat_schedule = {
}
@app.task
def manage_validated_data(message: str) -> None:
logger.warning(" [x] Message processed : %s", message)
@app.task(ignore_result=True)
def update_inventory(date: str) -> None:
client = get_obspy_client()
Loading