Jeremy Phelps
06/14/2021, 7:26 PMTask.run
method? I'd like to do something like this:
class MyTask(prefect.tasks.core.function.FunctionTask):
def run(*args, **kwargs):
try:
return self.super().run(*args, **kwargs)
except BaseException as e:
prefect.context.get('logger').error('Error on host {}: {}'.format(hostname, format_error(e)))
raise # Or better yet, mark the task as Failed
By default, Prefect catches, logs, and marks the task as failed for some errors, but for others, it puts str(e)
in the "State Message" and throws away the stack trace, and for still others, it drops the error completely and the task appears to be "running". My wrapper would fix these problems.
But you can't write the method above because there's a function called _validate_run_signature
that expressly forbids it. So the only way to do it that I can think of would be to either wait for Python to get macros, or to add code generation to my deployment process (it would insert the try/except block into the body of every function that has a @task
decorator).
Is it really impossible to write this wrapper as an ordinary method?Kevin Kho
Kevin Kho
task
decoratorJeremy Phelps
06/14/2021, 8:07 PM*args
argument in its signature.Zanie
import functools
from prefect import task, Task
def my_dec(fn):
@functools.wraps(fn)
def wrapped(*args, **kwargs):
return fn(*args, **kwargs)
return wrapped
@task
@my_dec
def add(x, y=1):
return x + y
add.run(1, 2)
Zanie
Zanie
Jeremy Phelps
06/14/2021, 8:19 PMstr()
of the exception object in the State Message without a stack trace instead of putting it in the logging stream. It's hard to reproduce any of the bad behaviors. Some errors get handled properly while others don't.Zanie
Jeremy Phelps
06/14/2021, 8:20 PMZanie
Zanie
Jeremy Phelps
06/14/2021, 8:28 PMJeremy Phelps
06/14/2021, 8:30 PMJun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: ERROR:prefect.CloudTaskRunner:Unexpected error: IndexError('list index out of range')
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: Traceback (most recent call last):
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: new_state = method(self, state, *args, **kwargs)
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: logger=self.logger,
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: return task.run(*args, **kwargs) # type: ignore
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "<string>", line 45, in delivery_scheduler_task
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/pict/prefect/tasks/delivery_scheduler.py", line 25, in delivery_scheduler
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: return delivery_scheduler_core(n_drivers, max_cost_arg, metro_id, job_id, forecast_job_id, ortools)
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/pict/prefect/tasks/delivery_scheduler.py", line 71, in delivery_scheduler_core
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: max_cost_arg,
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/pict/prefect/tasks/delivery_scheduler.py", line 141, in run_ortools_scheduler
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: lv = scheduler.compute_location_values(forecast_job_id, depot.metro_id)
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/pict/webapp/pict_api/delivery_scheduler/scheduler.py", line 50, in compute_location_values
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: date = ItemForecast.objects.filter(job_id=forecast_job_id)[0].start_date
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/django/db/models/query.py", line 309, in __getitem__
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: return qs._result_cache[0]
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: IndexError: list index out of range
Jeremy Phelps
06/14/2021, 8:30 PMTask.run
.Zanie
Zanie