Skip to content
Snippets Groups Projects

Draft: Resolve "Validated data : listen to integration queue in order to process validated data"

+ 40
2
@@ -3,11 +3,13 @@ from pathlib import Path
from typing import Any
import pendulum
from celery import Celery, Task, chain, group
from celery import Celery, Task, bootsteps, chain, group
from celery.exceptions import Retry, WorkerLostError
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery.worker.request import Request as _Request
from kombu import Channel as KombuChannel
from kombu import Consumer, Exchange, Message, Queue
from obspy import UTCDateTime, read
from obspy.clients.fdsn.client import (
FDSNBadRequestException,
@@ -87,6 +89,32 @@ class Request(_Request):
raise WorkerLostError(message) from exc_info.exception
post_integration_queue = Queue(
"waveqc-data",
exchange=Exchange("post-integration", "topic", durable=True),
routing_key="staging.seed-data",
auto_delete=True,
durable=False,
)
class ValidatedDataIntegrationStep(bootsteps.ConsumerStep):
def get_consumers(self, channel: KombuChannel) -> list[Consumer]:
return [
Consumer(
channel,
queues=[post_integration_queue],
callbacks=[self.handle_message],
accept=["json"],
)
]
def handle_message(self, body: str, message: Message) -> None:
manage_validated_data.delay(body)
message.ack()
logger.warning(" [x] Acked message: %s", body)
class DatabaseTask(Task):
# When WorkerLostError exception is raise, tasks is retried 3 times
# at 10, 20 and 40 seconds interval
@@ -111,7 +139,12 @@ app = Celery(
result_backend=str(settings.REDIS_DSN),
task_cls="waveqc.tasks.DatabaseTask",
)
app.conf.task_default_queue = "celery"
app.conf.task_queues = (
Queue("celery"),
post_integration_queue,
)
app.steps["consumer"].add(ValidatedDataIntegrationStep)
app.conf.beat_schedule = {
# Executes daily at 1:15 a.m.
"launch-checks-daily": {
@@ -136,6 +169,11 @@ app.conf.beat_schedule = {
}
@app.task
def manage_validated_data(message: str) -> None:
logger.warning(" [x] Received message: %s", message)
@app.task(ignore_result=True)
def update_inventory(date: str) -> None:
client = get_obspy_client()
Loading