Hi everyone. Is there any way to write a general ...
# ask-community
j
Hi everyone. Is there any way to write a general wrapper for the
Task.run
method? I'd like to do something like this:
Copy code
class MyTask(prefect.tasks.core.function.FunctionTask):
    def run(*args, **kwargs):
        try:
            return self.super().run(*args, **kwargs)
        except BaseException as e:
            prefect.context.get('logger').error('Error on host {}: {}'.format(hostname, format_error(e)))
            raise # Or better yet, mark the task as Failed
By default, Prefect catches, logs, and marks the task as failed for some errors, but for others, it puts
str(e)
in the "State Message" and throws away the stack trace, and for still others, it drops the error completely and the task appears to be "running". My wrapper would fix these problems. But you can't write the method above because there's a function called
_validate_run_signature
that expressly forbids it. So the only way to do it that I can think of would be to either wait for Python to get macros, or to add code generation to my deployment process (it would insert the try/except block into the body of every function that has a
@task
decorator). Is it really impossible to write this wrapper as an ordinary method?
k
Hey @Jeremy Phelps, Prefect always logs errors with the full stack trace -- the state message is intentionally simplified for legibility. If you find places where your task failure is not properly captured by Prefect we would appreciate a bug report for that!
Also, instead of a subclass, this might be better done with another decorator that overrides / is applied on top of the 
task
 decorator
j
The same problem would apply to that decorator. No matter what, the end result must be a function with no
*args
argument in its signature.
z
Hey @Jeremy Phelps -- the key here is that you'll need to write a decorator that passes the signature of your function through so when it's inspected it's not taking variadic args (which we cannot support). This example runs without encountering the error you're mentioniong
Copy code
import functools
from prefect import task, Task


def my_dec(fn):
    @functools.wraps(fn)
    def wrapped(*args, **kwargs):
        return fn(*args, **kwargs)

    return wrapped


@task
@my_dec
def add(x, y=1):
    return x + y


add.run(1, 2)
However, if we're ever dropping an error and leaving your task as "Running" that is absolutely something we'd like to fix for you
If you could give us some more information about that case, I'm happy to apply a patch wherever necessary
j
The thing you do more often is put the
str()
of the exception object in the State Message without a stack trace instead of putting it in the logging stream. It's hard to reproduce any of the bad behaviors. Some errors get handled properly while others don't.
z
Are you saying the error is just in the state message and never logged? If so, I'd like to fix that too. We're not going to put stack traces in state messages but they definitely should be in the logs where possible.
j
Yes, sometimes a task fails with the error only in the state message.
z
If it's hard to reproduce the behavior with the 'Running' failed state, it's probably an issue at the executor level. Are you using timeouts?
If you can get me some more details on when the task fails with no logs I can add them in!
j
Here's one flow run with an error appearing solely in the State Message: https://cloud.prefect.io/stockwell/flow-run/43b94ab5-ae8a-4c67-899c-434b0cc2955b
The stack trace had to be extracted from the Dask worker machine where it happened:
Copy code
Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: ERROR:prefect.CloudTaskRunner:Unexpected error: IndexError('list index out of range')
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: Traceback (most recent call last):
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     new_state = method(self, state, *args, **kwargs)
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     logger=self.logger,
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     return task.run(*args, **kwargs)  # type: ignore
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "<string>", line 45, in delivery_scheduler_task
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/pict/prefect/tasks/delivery_scheduler.py", line 25, in delivery_scheduler
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     return delivery_scheduler_core(n_drivers, max_cost_arg, metro_id, job_id, forecast_job_id, ortools)
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/pict/prefect/tasks/delivery_scheduler.py", line 71, in delivery_scheduler_core
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     max_cost_arg,
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/pict/prefect/tasks/delivery_scheduler.py", line 141, in run_ortools_scheduler
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     lv = scheduler.compute_location_values(forecast_job_id, depot.metro_id)
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/pict/webapp/pict_api/delivery_scheduler/scheduler.py", line 50, in compute_location_values
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     date = ItemForecast.objects.filter(job_id=forecast_job_id)[0].start_date
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:   File "/home/dask/.pyenv/versions/3.7.3/lib/python3.7/site-packages/django/db/models/query.py", line 309, in __getitem__
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]:     return qs._result_cache[0]
    Jun 14 10:51:32 dask-worker0 run-dask-worker.sh[795679]: IndexError: list index out of range
It appears to have happened within a call to some version of
Task.run
.
z
Thanks! I'll look into that one and get back to you.