Skip to content

Commit

Permalink
Merge pull request #129 from MacroConnections/error-messaging
Browse files Browse the repository at this point in the history
Error messaging
  • Loading branch information
kevinzenghu committed Feb 8, 2016
2 parents 116ea6b + 6083181 commit 0a6fd60
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 37 deletions.
9 changes: 0 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,7 @@ __pycache__/
# Virtualenv
venv/

# Logs
*.log

# Editor-specific
.idea/

uploads/

migrations/
preloaded/
.idea

preloaded
1 change: 1 addition & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class BaseConfig(object):

CELERY_IMPORTS = [
'dive.tasks.pipelines',
'dive.tasks.handlers',
'dive.tasks.ingestion.upload',
'dive.tasks.ingestion.dataset_properties',
'dive.tasks.ingestion.type_detection',
Expand Down
7 changes: 6 additions & 1 deletion dive/resources/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from dive.data.access import get_dataset_sample
from dive.tasks.pipelines import full_pipeline, ingestion_pipeline, get_chain_IDs
from dive.tasks.ingestion.upload import upload_file
from dive.tasks.handlers import error_handler


import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,7 +50,10 @@ def post(self):
'datasets': datasets
}
for dataset in datasets:
ingestion_task = ingestion_pipeline.apply_async(args=[dataset['id'], project_id])
ingestion_task = ingestion_pipeline.apply_async(
args=[dataset['id'], project_id],
link_error = error_handler.s()
)
return make_response(jsonify({'task_id': ingestion_task.task_id}))
return make_response(jsonify({'status': 'Upload failed'}))

Expand Down
6 changes: 5 additions & 1 deletion dive/resources/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dive.tasks.visualization import GeneratingProcedure
from dive.tasks.visualization.data import get_viz_data_from_enumerated_spec
from dive.tasks.pipelines import viz_spec_pipeline, get_chain_IDs
from dive.tasks.handlers import error_handler

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,7 +50,10 @@ def post(self):
logger.info('Formatting result took %.3fs', (time() - start_time))
return result
else:
specs_task = viz_spec_pipeline.apply_async(args=[dataset_id, project_id, selected_fields, conditionals, config])
specs_task = viz_spec_pipeline.apply_async(
args = [dataset_id, project_id, selected_fields, conditionals, config],
link_error = error_handler.s()
)
from time import time
start_time = time()

Expand Down
11 changes: 10 additions & 1 deletion dive/resources/task_resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from flask import make_response, jsonify, current_app, url_for
from flask.ext.restful import Resource, reqparse, marshal_with

from celery import states, chain, group
from celery import states
from celery.result import result_from_tuple, ResultSet

from dive.task_core import celery
Expand Down Expand Up @@ -39,6 +39,7 @@ class TaskResult(Resource):
def get(self, task_id):
task = celery.AsyncResult(task_id)

logger.debug('%s: %s', task_id, task.state)
if task.state == states.PENDING:
if (task.info) and (task.info.get('desc')):
logger.info(task.info.get('desc'))
Expand All @@ -57,7 +58,15 @@ def get(self, task_id):
'state': task.state,
}

elif task.state == states.FAILURE:
state = {
'error': task.info.get('error'),
'state': task.state,
}

response = jsonify(state)
if task.state == states.PENDING:
response.status_code = 202
elif task.state == states.FAILURE:
response.status_code = 500
return response
19 changes: 11 additions & 8 deletions dive/resources/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dive.db import db_access
from dive.resources.serialization import jsonify
from dive.tasks.pipelines import unpivot_pipeline, reduce_pipeline, join_pipeline
from dive.tasks.handlers import error_handler

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -36,9 +37,10 @@ def post(self):
column_ids = args.get('column_ids')
new_dataset_name_prefix = args.get('new_dataset_name_prefix')

reduce_task = reduce_pipeline.apply_async(args=[
column_ids, new_dataset_name_prefix, dataset_id, project_id
])
reduce_task = reduce_pipeline.apply_async(
args = [column_ids, new_dataset_name_prefix, dataset_id, project_id],
link_error = error_handler.s()
)
return make_response(jsonify({ 'taskId': reduce_task.task_id }))


Expand All @@ -65,10 +67,10 @@ def post(self):
value_name = args.get('value_name')
new_dataset_name_prefix = args.get('new_dataset_name_prefix')

unpivot_task = unpivot_pipeline.apply_async(args=[
pivot_fields, variable_name, value_name, new_dataset_name_prefix,
dataset_id, project_id
])
unpivot_task = unpivot_pipeline.apply_async(
args = [pivot_fields, variable_name, value_name, new_dataset_name_prefix, dataset_id, project_id],
link_error = error_handler.s()
)
return make_response(jsonify({ 'taskId': unpivot_task.task_id }))


Expand Down Expand Up @@ -107,6 +109,7 @@ def post(self):
join_task = join_pipeline.apply_async(args=[
left_dataset_id, right_dataset_id, on, left_on, right_on, how,
left_suffix, right_suffix, new_dataset_name_prefix, project_id
])
],
link_error = error_handler.s())

return make_response(jsonify({ 'taskId': join_task.task_id }))
2 changes: 1 addition & 1 deletion dive/setup_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import logging.config

def setup_logging(default_path='logging.yaml', default_level=logging.INFO):
def setup_logging(default_path='logging.yaml', default_level=logging.DEBUG):
path = default_path
if os.path.exists(path):
with open(path, 'rt') as f:
Expand Down
18 changes: 18 additions & 0 deletions dive/tasks/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import time

from celery import states
from dive.task_core import celery, task_app

import logging
logger = logging.getLogger(__name__)

@celery.task(bind=True)
def error_handler(self, task_id):
result = self.app.AsyncResult(task_id)
self.update_state(
task_id=task_id,
state=states.FAILURE,
meta={'error': result.traceback}
)
logger.error('Task {0} raised exception: {1!r}\n{2!r}'.format(
task_id, result.result, result.traceback))
1 change: 1 addition & 0 deletions dive/tasks/visualization/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
'''
Functions for returning the data corresponding to a given visualization type and specification
'''
from __future__ import division

import math
import numpy as np
Expand Down
20 changes: 10 additions & 10 deletions dive/tasks/visualization/spec_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ def attach_data_to_viz_specs(enumerated_viz_specs, dataset_id, project_id, condi
grouped_df = conditioned_df.groupby(grouped_field)
precomputed['groupby'][grouped_field] = grouped_df

try:
data = get_viz_data_from_enumerated_spec(spec, project_id, conditionals, config,
df=conditioned_df,
precomputed=precomputed,
data_formats=['score', 'visualize']
)

except Exception as e:
logger.error("Error getting viz data %s", e, exc_info=True)
continue
# try:
data = get_viz_data_from_enumerated_spec(spec, project_id, conditionals, config,
df=conditioned_df,
precomputed=precomputed,
data_formats=['score', 'visualize']
)

# except Exception as e:
# logger.error("Error getting viz data %s", e, exc_info=True)
# continue

if not data:
logger.info('No data for spec with generating procedure %s', spec['generating_procedure'])
Expand Down
2 changes: 1 addition & 1 deletion logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ handlers:
class: logging.handlers.RotatingFileHandler
level: DEBUG
formatter: detailed
filename: dive.log
filename: logs/dive.log
encoding: utf8
backupCount: 20

Expand Down
1 change: 1 addition & 0 deletions logs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[^.]*
5 changes: 0 additions & 5 deletions preloaded_metadata.yaml

This file was deleted.

0 comments on commit 0a6fd60

Please sign in to comment.