Mathijs Miermans
01/25/2022, 10:33 PMStarting task run...
for tasks that are skipped because an upstream task fails? (Example code in thread.)Mathijs Miermans
01/25/2022, 10:33 PMimport prefect
from prefect import task, Flow
from time import sleep
@task
def failing_task() -> str:
raise ValueError('Raising an exception...')
return 'This line is not reached.'
@task
def upstream_task(x_arg: str = 'foobar'):
logger = prefect.context.get("logger")
logger.error(f"running dependent_task with {x_arg}")
sleep(5) # sleep for 5 seconds
with Flow("Test failing task") as flow:
x = failing_task()
upstream_task(x)
if __name__ == "__main__":
flow.run()
Logs:
[2022-01-25 14:21:01-0800] INFO - prefect.FlowRunner | Beginning Flow run for 'Test failing task'
[2022-01-25 14:21:01-0800] INFO - prefect.TaskRunner | Task 'failing_task': Starting task run...
[2022-01-25 14:21:01-0800] ERROR - prefect.TaskRunner | Task 'failing_task': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/mathijs/.local/share/virtualenvs/data-flows-00WzdnXT/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/mathijs/.local/share/virtualenvs/data-flows-00WzdnXT/lib/python3.9/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/mathijs/data-flows/src/flows/user_impressions_flow.py", line 8, in failing_task
raise ValueError('Raising an exception...')
ValueError: Raising an exception...
[2022-01-25 14:21:01-0800] INFO - prefect.TaskRunner | Task 'failing_task': Finished task run for task with final state: 'Failed'
[2022-01-25 14:21:01-0800] INFO - prefect.TaskRunner | Task 'upstream_task': Starting task run...
[2022-01-25 14:21:01-0800] INFO - prefect.TaskRunner | Task 'upstream_task': Finished task run for task with final state: 'TriggerFailed'
[2022-01-25 14:21:01-0800] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Mathijs Miermans
01/25/2022, 10:35 PMupstream_task
isn't executed, I set a breakpoint, printed a log message, and added a sleep statement. So it's not executed, but I'm confused that the log says Task 'upstream_task': Starting task run...
.Kevin Kho