Commit c4fc761b authored by Raphael Jacquot's avatar Raphael Jacquot

move the asynchronous processing step *after*

the verification step,
should cut down on potential DDOS
parent 35737483
Pipeline #27584 skipped with stage
......@@ -3,17 +3,13 @@ import csv
import datetime
import json
import secrets
import threading
import time
from multiprocessing import Process
from pyramid.config import Configurator
from pyramid.httpexceptions import HTTPFound
from pyramid.response import Response
from pyramid.view import view_config
from pyramid_mailer.message import Message
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import DBAPIError
from jmdc_app.forms.submit import FormImportCsv, FormSubmitResults
......@@ -22,44 +18,6 @@ from jmdc_app.models.submission import StarInfoEntry, Submission
from .. import models
class ImportProcess(threading.Thread):
"""
This thread object launches an external process to take care of importing
new data.
"""
def __init__(self, settings, sub):
super().__init__()
self.settings = settings
self.sub = sub
def async_import_process(self):
count = 0
print("running from %s"%(repr(self)))
print("submission_id = %d, email = %s"%(self.sub.id, self.sub.email))
print("headers(%d) %4d : %s"%(self.sub.parse_mode, self.sub.id, self.sub.headers))
headers = json.loads(self.sub.headers)
engine = create_engine(self.settings['sqlalchemy.url'])
Session = sessionmaker(engine)
session = Session()
entries = session.query(StarInfoEntry).filter_by(submission_id=self.sub.id).all()
for entry in entries:
print("processing %4d-%4d : %s"%(self.sub.id, entry.line_number,entry.line_data))
line_data = json.loads(entry.line_data)
si = StarInfo(session, headers, line_data)
if si:
print('OK')
else:
print('FAIL')
print('end of processing')
def run(self):
p = Process(target=self.async_import_process)
p.start()
p.join()
class import_new_data():
def __init__(self, context=None, request=None):
......@@ -104,12 +62,6 @@ class import_new_data():
si_entry.line_data = json.dumps(entry)
self.sub.star_info_entries.append(si_entry)
# this function does what it's name says ;-)
def start_import_process(self):
t = ImportProcess( self.request.registry.settings, self.sub)
t.daemon = True
t.start()
#
# this imports the data synchronously.
......@@ -228,10 +180,6 @@ class import_new_data():
self.session.commit()
# launch the worker process after the data is committed
if run_async:
self.start_import_process()
return True
@view_config(route_name='submit_text', renderer='../templates/submit_text.jinja2', require_csrf=True)
......
import json
import threading
import time
from multiprocessing import Process
import pyramid.httpexceptions as exc
from pyramid.response import Response
from pyramid.view import view_config
from sqlalchemy import create_engine
from sqlalchemy.exc import DBAPIError
import pyramid.httpexceptions as exc
from sqlalchemy.orm import sessionmaker
from jmdc_app.models.star_info import StarInfo
from jmdc_app.models.submission import StarInfoEntry, Submission
class ImportProcess(threading.Thread):
"""
This thread object launches an external process to take care of importing
new data.
"""
def __init__(self, settings, sub):
super().__init__()
self.settings = settings
self.sub = sub
def async_import_process(self):
count = 0
print("running from %s"%(repr(self)))
print("submission_id = %d, email = %s"%(self.sub.id, self.sub.email))
print("headers(%d) %4d : %s"%(self.sub.parse_mode, self.sub.id, self.sub.headers))
headers = json.loads(self.sub.headers)
engine = create_engine(self.settings['sqlalchemy.url'])
Session = sessionmaker(engine)
session = Session()
entries = session.query(StarInfoEntry).filter_by(submission_id=self.sub.id).all()
for entry in entries:
print("processing %4d-%4d : %s"%(self.sub.id, entry.line_number,entry.line_data))
line_data = json.loads(entry.line_data)
si = StarInfo(session, headers, line_data)
if si:
print('OK')
else:
print('FAIL')
print('end of processing')
def run(self):
p = Process(target=self.async_import_process)
p.start()
p.join()
# this function does what it's name says ;-)
def start_import_process(request, submission):
print("Start the import process thread")
t = ImportProcess(request.registry.settings, submission)
t.daemon = True
t.start()
@view_config(route_name='verify', renderer='../templates/verified.jinja2')
def moderate_submission(request):
token = request.matchdict["token"]
run_async = request.registry.settings.get('data_ingest.async', False)
print("Running async '%s'"%(run_async))
if isinstance(run_async, str):
run_async = run_async == 'True'
session = request.dbsession
submission = session.query(Submission).filter_by(upload_ticket=token).first()
......@@ -25,11 +83,11 @@ def moderate_submission(request):
# send email to moderator
moderate_url = self.request.route_url('moderate-submission', token=submission.moderate_ticket)
moderate_url = request.route_url('moderate-submission', token=submission.moderate_ticket)
print("DEBUG: moderation url %s"%(moderate_url))
try:
sender = self.request.registry.settings['jmdc.sender.email']
moderator = self.request.registry.settings['jmdc.moderator.email']
sender = request.registry.settings['jmdc.sender.email']
moderator = request.registry.settings['jmdc.moderator.email']
except:
print("DEBUG: no mail sent for moderator %s"%(moderate_url))
else:
......@@ -38,6 +96,10 @@ def moderate_submission(request):
message = Message(subject="there is one submission to moderate", sender=sender,
recipients=[moderator],
body="There is a new submission at JMDC, moderation can be done here : \n %s \nRegards!"%moderate_url)
self.request.mailer.send(message)
request.mailer.send(message)
# launch the worker process after the data is committed
if run_async:
start_import_process(request, submission)
return { 'submission': submission }
\ No newline at end of file
return { 'submission': submission }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment