Commit 35737483 authored by Raphael Jacquot's avatar Raphael Jacquot

start implementing the actual async data processing bits now that the process actually works

parent f71b6b59
Pipeline #27557 skipped with stage
......@@ -37,13 +37,21 @@ class ImportProcess(threading.Thread):
def async_import_process(self):
count = 0
print("running from %s"%(repr(self)))
print("submission_id = %d, email = %s"%(,
print("submission_id = %d, email = %s"%(,
print("headers(%d) %4d : %s"%(self.sub.parse_mode,, self.sub.headers))
headers = json.loads(self.sub.headers)
engine = create_engine(self.settings['sqlalchemy.url'])
Session = sessionmaker(engine)
session = Session()
entries = session.query(StarInfoEntry).filter_by(
for entry in entries:
print("processing %d-%d : %s"%(, entry.line_number,entry.line_data))
print("processing %4d-%4d : %s"%(, entry.line_number,entry.line_data))
line_data = json.loads(entry.line_data)
si = StarInfo(session, headers, line_data)
if si:
print('end of processing')
def run(self):
......@@ -70,6 +78,12 @@ class import_new_data():
self.errors = []
# this imports the data synchronously
# it only stores the data as a json dump in the database
# and defers all processing for later
def import_entry_asynchronous(self, entry):
print("ASYNC %5d %s"%(self.line, entry))
if not self.sub.headers:
......@@ -85,13 +99,26 @@ class import_new_data():
# add line as a star_info_entry
si_entry = StarInfoEntry()
si_entry.line_number = self.line
si_entry.line_data = json.dumps(entry)
# this function does what it's name says ;-)
def start_import_process(self):
t = ImportProcess( self.request.registry.settings, self.sub)
t.daemon = True
# this imports the data synchronously.
# the user has to wait forever for the page to return
# this causes issues when using a front end processor
# such as haproxy in front of the application.
# in this case, it's better to use the async subsystem
# above
def import_entry_synchronous(self, entry):
print("SYNC %5d %s"%(self.line, entry))
......@@ -113,7 +140,10 @@ class import_new_data():
self.entries = []
self.entries.append({'line': self.line, 'entry': si})
if self.is_entry_unique(si):
# this does only check if the entry is not YET in the database.
# entries for the current batch is NOT checked for uniqueness
if si.is_unique():
# check if there is an error for this line
if not si.errors:
si_entry = StarInfoEntry()
......@@ -125,15 +155,6 @@ class import_new_data():
self.line_error = True
def is_entry_unique(self, entry):
return entry.is_unique()
def start_import_process(self):
print("should be starting the worker process")
t = ImportProcess( self.request.registry.settings, self.sub)
t.daemon = True
def import_entry_set(self, email, entry_set):
self.sub = Submission()
dt =
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment