Ajeel Ahmed
07/07/2023, 9:45 PMtasks
no, the second task fail will also fail but if you run these tasks via task_runners
and say use the sequential one, it’ll be fine and the second one will run even if the first one failedChristopher Boyd
07/07/2023, 10:12 PMAjeel Ahmed
07/07/2023, 10:13 PMtasks
instead of task_runners
Christopher Boyd
07/07/2023, 10:14 PMAjeel Ahmed
07/07/2023, 10:14 PMtasks
section of the documentationdef flow():
task_1()
task_2()
and task_1
fails, task_2
does not runChristopher Boyd
07/07/2023, 10:18 PMAjeel Ahmed
07/07/2023, 10:19 PMChristopher Boyd
07/07/2023, 10:20 PMAjeel Ahmed
07/07/2023, 10:21 PMChristopher Boyd
07/07/2023, 10:23 PMAjeel Ahmed
07/07/2023, 10:24 PMChristopher Boyd
07/07/2023, 10:25 PMfrom prefect import task, flow
@task
def task1():
with open ('./test.txt', 'r') as f:
return f.readlines()
@task(log_prints=True)
def task2():
print ('Hello World')
@flow
def main():
lines = task1(return_state=True)
task2()
if __name__ == "__main__":
main()
18:24:46.612 | INFO | prefect.engine - Created flow run 'optimistic-kingfisher' for flow 'main'
18:24:46.616 | INFO | Flow run 'optimistic-kingfisher' - View at <https://app.prefect.cloud/account/e16f474c-4714-40ae-89c9-ba3cbdd3bf13/workspace/e7c934ce-f75b-4d2f-a166-e25d40e647d7/flow-runs/flow-run/259b2c2b-2c48-4eaa-9c94-b1da523aa702>
18:24:47.433 | INFO | Flow run 'optimistic-kingfisher' - Created task run 'task1-0' for task 'task1'
18:24:47.435 | INFO | Flow run 'optimistic-kingfisher' - Executing 'task1-0' immediately...
18:24:47.739 | ERROR | Task run 'task1-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
result = await call.aresult()
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/christopherboyd/prefect_flows/utility/test.py", line 5, in task1
with open ('./test.txt', 'r') as f:
FileNotFoundError: [Errno 2] No such file or directory: './test.txt'
18:24:47.840 | ERROR | Task run 'task1-0' - Finished in state Failed("Task run encountered an exception FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
18:24:47.942 | INFO | Flow run 'optimistic-kingfisher' - Created task run 'task2-0' for task 'task2'
18:24:47.943 | INFO | Flow run 'optimistic-kingfisher' - Executing 'task2-0' immediately...
Hello World
18:24:48.800 | INFO | Task run 'task2-0' - Finished in state Completed()
18:24:49.436 | ERROR | Flow run 'optimistic-kingfisher' - Finished in state Failed('1/2 states failed.')
Ajeel Ahmed
07/07/2023, 10:48 PMtask_runners
sequentially solves this problem automatically even though I’m doing the same thing without using task_runners
return_state=True
makes the other 2nd task run even if the first one fails, this is not the case if I don’t pass this parameterChristopher Boyd
07/08/2023, 12:29 AMCalling a task function from within a flow, using the default task settings, executes the function sequentially. Execution of the task function blocks execution of the flow until the task completes. This means, by default, calling multiple tasks in a flow causes them to run in order.
.submit()
to your task to submit it to the task runner, which returns a futurefrom prefect import task, flow
@task
def task1():
with open ('./test.txt', 'r') as f:
return f.readlines()
@task(log_prints=True)
def task2():
return "Hello World"
@flow
def main():
x = task1.submit()
y = task2.submit()
return x, y
if __name__ == "__main__":
main()
20:31:20.233 | INFO | prefect.engine - Created flow run 'diligent-cicada' for flow 'main'
20:31:20.234 | INFO | Flow run 'diligent-cicada' - View at <https://app.prefect.cloud/account/e16f474c-4714-40ae-89c9-ba3cbdd3bf13/workspace/e7c934ce-f75b-4d2f-a166-e25d40e647d7/flow-runs/flow-run/71038a14-fa5b-4e70-a43e-f883926bb68f>
20:31:21.413 | INFO | Flow run 'diligent-cicada' - Created task run 'task1-0' for task 'task1'
20:31:21.415 | INFO | Flow run 'diligent-cicada' - Submitted task run 'task1-0' for execution.
20:31:21.496 | INFO | Flow run 'diligent-cicada' - Created task run 'task2-0' for task 'task2'
20:31:21.497 | INFO | Flow run 'diligent-cicada' - Submitted task run 'task2-0' for execution.
20:31:21.766 | ERROR | Task run 'task1-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
result = await call.aresult()
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/christopherboyd/prefect_flows/utility/test.py", line 5, in task1
with open ('./test.txt', 'r') as f:
FileNotFoundError: [Errno 2] No such file or directory: './test.txt'
20:31:21.968 | INFO | Task run 'task2-0' - Finished in state Completed()
20:31:21.977 | ERROR | Task run 'task1-0' - Finished in state Failed("Task run encountered an exception FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
20:31:22.172 | ERROR | Flow run 'diligent-cicada' - Finished in state Failed('1/2 states failed.')
20:31:43.543 | ERROR | Task run 'task1-0' - Finished in state Failed("Task run encountered an exception FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
20:31:43.554 | ERROR | Flow run 'mauve-boar' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
result = await flow_call.aresult()
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/christopherboyd/prefect_flows/utility/test.py", line 16, in main
x = task1()
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/tasks.py", line 505, in __call__
return enter_task_run_engine(
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
return await future._result()
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
result = await call.aresult()
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/christopherboyd/prefect_flows/utility/test.py", line 5, in task1
with open ('./test.txt', 'r') as f:
FileNotFoundError: [Errno 2] No such file or directory: './test.txt'
20:31:43.679 | ERROR | Flow run 'mauve-boar' - Finished in state Failed("Flow run encountered an exception. FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
Ajeel Ahmed
07/08/2023, 1:04 AM