https://prefect.io logo
a

Ajeel Ahmed

07/07/2023, 9:45 PM
okay so for anyone else who had this question this is the answer: if they’re just normal
tasks
no, the second task fail will also fail but if you run these tasks via
task_runners
and say use the sequential one, it’ll be fine and the second one will run even if the first one failed
1
c

Christopher Boyd

07/07/2023, 10:12 PM
Yes, they can run independently pass/ fail. States only matter for the final Flow state
a

Ajeel Ahmed

07/07/2023, 10:13 PM
hey @Christopher Boyd! Thanks for the reply! Now I just have one simple question, can I do this just using normal
tasks
instead of
task_runners
c

Christopher Boyd

07/07/2023, 10:14 PM
Yes
a

Ajeel Ahmed

07/07/2023, 10:14 PM
okay would you be willing to point me in the right direction because I couldn’t find anything for that in the
tasks
section of the documentation
so when I do :
Copy code
def flow():
    task_1()
    task_2()
and
task_1
fails,
task_2
does not run
c

Christopher Boyd

07/07/2023, 10:18 PM
What are the tasks doing ? Are there dependencies ? Do they raise an error ? A task failure by itself shouldn’t stop other tasks from running , it would only affect the final state of the flow
a

Ajeel Ahmed

07/07/2023, 10:19 PM
nope! No dependencies, but yes they do raise an exception that is unrelated to Prefect
Is that a problem?
c

Christopher Boyd

07/07/2023, 10:20 PM
so they aren’t just failing then, they are raising an exception
a

Ajeel Ahmed

07/07/2023, 10:21 PM
hmmm, I see I see maybe I should make them fail in case there is an exception?
c

Christopher Boyd

07/07/2023, 10:23 PM
you can do a try / catch and return a manual Failed state yes
🙌 1
a

Ajeel Ahmed

07/07/2023, 10:24 PM
oh that’s awesome! Thanks a ton @Christopher Boyd amazing help!
c

Christopher Boyd

07/07/2023, 10:25 PM
Copy code
from prefect import task, flow

@task
def task1():
    with open ('./test.txt', 'r') as f:
        return f.readlines()


@task(log_prints=True)
def task2():
    print ('Hello World')


@flow
def main():
    lines = task1(return_state=True)
    task2()


if __name__ == "__main__":
    main()
Will return:
Copy code
18:24:46.612 | INFO    | prefect.engine - Created flow run 'optimistic-kingfisher' for flow 'main'
18:24:46.616 | INFO    | Flow run 'optimistic-kingfisher' - View at <https://app.prefect.cloud/account/e16f474c-4714-40ae-89c9-ba3cbdd3bf13/workspace/e7c934ce-f75b-4d2f-a166-e25d40e647d7/flow-runs/flow-run/259b2c2b-2c48-4eaa-9c94-b1da523aa702>
18:24:47.433 | INFO    | Flow run 'optimistic-kingfisher' - Created task run 'task1-0' for task 'task1'
18:24:47.435 | INFO    | Flow run 'optimistic-kingfisher' - Executing 'task1-0' immediately...
18:24:47.739 | ERROR   | Task run 'task1-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/christopherboyd/prefect_flows/utility/test.py", line 5, in task1
    with open ('./test.txt', 'r') as f:
FileNotFoundError: [Errno 2] No such file or directory: './test.txt'
18:24:47.840 | ERROR   | Task run 'task1-0' - Finished in state Failed("Task run encountered an exception FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
18:24:47.942 | INFO    | Flow run 'optimistic-kingfisher' - Created task run 'task2-0' for task 'task2'
18:24:47.943 | INFO    | Flow run 'optimistic-kingfisher' - Executing 'task2-0' immediately...
Hello World
18:24:48.800 | INFO    | Task run 'task2-0' - Finished in state Completed()
18:24:49.436 | ERROR   | Flow run 'optimistic-kingfisher' - Finished in state Failed('1/2 states failed.')
note - task1 fails (because it’s raising a FileNotFoundError), but task 2 still runs
I think the issue you are running into is this: https://docs.prefect.io/2.10.18/concepts/flows/#raise-an-exception
an example that could fail without raising an exception, might be something like attempting to poll an API, or insert some data into a database. It could fail to do so and fail the task - there’s no exception or unhandled code, it was just unsuccessful
👀 1
a

Ajeel Ahmed

07/07/2023, 10:48 PM
it’s just weird that using
task_runners
sequentially solves this problem automatically even though I’m doing the same thing without using
task_runners
@Christopher Boyd I don’t know why but passing
return_state=True
makes the other 2nd task run even if the first one fails, this is not the case if I don’t pass this parameter
I’m pretty sure this is not by design?
c

Christopher Boyd

07/08/2023, 12:29 AM
This is by design - https://docs.prefect.io/2.10.18/concepts/task-runners/#task-runner-overview
Copy code
Calling a task function from within a flow, using the default task settings, executes the function sequentially. Execution of the task function blocks execution of the flow until the task completes. This means, by default, calling multiple tasks in a flow causes them to run in order.
If you want to run them all concurrently, then you can simply add
.submit()
to your task to submit it to the task runner, which returns a future
So that would just mean:
Copy code
from prefect import task, flow

@task
def task1():
    with open ('./test.txt', 'r') as f:
        return f.readlines()


@task(log_prints=True)
def task2():
    return "Hello World"


@flow
def main():
    x = task1.submit()
    y = task2.submit()
    return x, y


if __name__ == "__main__":
    main()
returns:
Copy code
20:31:20.233 | INFO    | prefect.engine - Created flow run 'diligent-cicada' for flow 'main'
20:31:20.234 | INFO    | Flow run 'diligent-cicada' - View at <https://app.prefect.cloud/account/e16f474c-4714-40ae-89c9-ba3cbdd3bf13/workspace/e7c934ce-f75b-4d2f-a166-e25d40e647d7/flow-runs/flow-run/71038a14-fa5b-4e70-a43e-f883926bb68f>
20:31:21.413 | INFO    | Flow run 'diligent-cicada' - Created task run 'task1-0' for task 'task1'
20:31:21.415 | INFO    | Flow run 'diligent-cicada' - Submitted task run 'task1-0' for execution.
20:31:21.496 | INFO    | Flow run 'diligent-cicada' - Created task run 'task2-0' for task 'task2'
20:31:21.497 | INFO    | Flow run 'diligent-cicada' - Submitted task run 'task2-0' for execution.
20:31:21.766 | ERROR   | Task run 'task1-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/christopherboyd/prefect_flows/utility/test.py", line 5, in task1
    with open ('./test.txt', 'r') as f:
FileNotFoundError: [Errno 2] No such file or directory: './test.txt'
20:31:21.968 | INFO    | Task run 'task2-0' - Finished in state Completed()
20:31:21.977 | ERROR   | Task run 'task1-0' - Finished in state Failed("Task run encountered an exception FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
20:31:22.172 | ERROR   | Flow run 'diligent-cicada' - Finished in state Failed('1/2 states failed.')
not submitting to the task runner means it’s run as a function (and not returning a future), so the result is:
Copy code
20:31:43.543 | ERROR   | Task run 'task1-0' - Finished in state Failed("Task run encountered an exception FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
20:31:43.554 | ERROR   | Flow run 'mauve-boar' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/christopherboyd/prefect_flows/utility/test.py", line 16, in main
    x = task1()
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/tasks.py", line 505, in __call__
    return enter_task_run_engine(
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
    return await future._result()
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/christopherboyd/virtualenvs/prefect2/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/christopherboyd/prefect_flows/utility/test.py", line 5, in task1
    with open ('./test.txt', 'r') as f:
FileNotFoundError: [Errno 2] No such file or directory: './test.txt'
20:31:43.679 | ERROR   | Flow run 'mauve-boar' - Finished in state Failed("Flow run encountered an exception. FileNotFoundError: [Errno 2] No such file or directory: './test.txt'")
To answer your question - yes this is by design and intentional - if you want to resolve futures and continue with your tasks, then they do need to be submitted to the task runner
Which I guess is a very long threaded way of going back to the very initial question you posted which was accurate to begin so my apologies if I misunderstood
a

Ajeel Ahmed

07/08/2023, 1:04 AM
haha no no it’s totally fine, I learned a lot about tasks so this was all worth it, thank you so much for all your help!
catjam 1
💯 1
👍 1