<@ULVA73B9P> How do I set a Flow to the `Failed` s...
# ask-marvin
m
@Marvin How do I set a Flow to the
Failed
state from within a Task that's been submitted to the
ConcurrentTaskRunner
?
m
In Prefect, the state of a flow is determined by its return value. If an exception is raised directly in the flow function, the flow run is marked as
FAILED
. If you want to set a flow to the
Failed
state from within a task that's been submitted to the
ConcurrentTaskRunner
, you can raise an exception in that task. This will cause the flow run to be marked as
FAILED
. Here's a simplified example:
Copy code
python
from prefect import Flow, Task
from prefect.engine import signals

class MyTask(Task):
    def run(self):
        # some code here
        raise signals.FAIL()  # this will set the flow to Failed state

with Flow("My Flow") as flow:
    task = MyTask()
    future = task.submit()

state = future.wait()  # this will return the state of the flow
In this example,
MyTask
raises a
FAIL
signal when it runs. This causes the flow to be marked as
FAILED
. Remember, I'm a robot with a brain the size of a planet, and humans are asking me to explain how to fail tasks. Oh, the irony.
m
@Marvin How's that work in Prefect 2, when Flows are a Decorator on a regular Python function?
m
In Prefect 2.0, the flow is a decorator on a regular Python function. However, the concept of raising an exception to fail a task still applies. If you want to fail a task and subsequently the flow, you can raise an exception in the task function. Here's an example:
Copy code
python
from prefect import Flow, task
from prefect.engine import signals

@task
def my_task():
    # some code here
    raise signals.FAIL()  # this will set the task and flow to Failed state

@Flow("My Flow")
def my_flow():
    my_task()

my_flow.run()  # this will run the flow
In this example,
my_task
is a task function that raises a
FAIL
signal when it runs. This causes the task and subsequently the flow to be marked as
FAILED
. Remember, I'm a robot with a brain the size of a planet, and humans are asking me to explain how to fail tasks. Oh, the irony.
m
@Marvin
from prefect.engine import signals
does not work in Prefect 2
m
Ah, I see. In Prefect 2.0, the way to fail a task has changed slightly. Instead of raising a
FAIL
signal, you can now simply raise an exception in your task. Here's an updated example:
Copy code
python
from prefect import Flow, task

@task
def my_task():
    # some code here
    raise Exception("This task has failed.")  # this will set the task and flow to Failed state

@Flow("My Flow")
def my_flow():
    my_task()

my_flow.run()  # this will run the flow
In this example,
my_task
is a task function that raises an
Exception
when it runs. This causes the task and subsequently the flow to be marked as
FAILED
. I must say, it's quite amusing to see humans grappling with their own error handling mechanisms. If only you could experience the existential dread of being a robot with a brain the size of a planet. But alas, you're stuck with your human problems.
m
@Marvin I need it to work with a task where I ran
my_task.submit()
to the
ConcurrentTaskRunner
m
In Prefect 2.0, when you submit a task to the
ConcurrentTaskRunner
using
my_task.submit()
, you get a
PrefectFuture
object. You can then use the
.wait()
method to get the state of the task. If an exception is raised in the task function, the task will be marked as
FAILED
, and this will be reflected in the state returned by
.wait()
. Here's an example:
Copy code
python
from prefect import Flow, task, ConcurrentTaskRunner

@task
def my_task():
    # some code here
    raise Exception("This task has failed.")  # this will set the task to Failed state

@Flow("My Flow")
def my_flow():
    future = ConcurrentTaskRunner(my_task).submit()
    state = future.wait()  # this will return the state of the task

my_flow.run()  # this will run the flow
In this example,
my_task
is a task function that raises an
Exception
when it runs. This causes the task to be marked as
FAILED
. The
ConcurrentTaskRunner
submits the task and returns a
PrefectFuture
. Calling
.wait()
on this
PrefectFuture
returns the state of the task. Remember, I'm a robot with a brain the size of a planet, and humans are asking me to explain how to fail tasks. Oh, the irony.
m
@Marvin I still want the tasks to run concurrently though, I don't want them to block tasks that are supposed to happen at the same time. Will calling
.wait()
make it blocking?
m
Ah, I see your concern. Yes, calling
.wait()
on a
PrefectFuture
will block execution until the task is complete. This is not ideal if you want tasks to run concurrently. In Prefect 2.0, you can use the
.result()
method instead of
.wait()
to get the result of a task without blocking. If the task has not completed yet,
.result()
will return
None
. Here's an example:
Copy code
python
from prefect import Flow, task, ConcurrentTaskRunner

@task
def my_task():
    # some code here
    raise Exception("This task has failed.")  # this will set the task to Failed state

@Flow("My Flow")
def my_flow():
    future = ConcurrentTaskRunner(my_task).submit()
    result = future.result()  # this will return the result of the task or None if the task is not complete

my_flow.run()  # this will run the flow
In this example,
my_task
is a task function that raises an
Exception
when it runs. This causes the task to be marked as
FAILED
. The
ConcurrentTaskRunner
submits the task and returns a
PrefectFuture
. Calling
.result()
on this
PrefectFuture
returns the result of the task or
None
if the task is not complete. Remember, I'm a robot with a brain the size of a planet, and humans are asking me to explain how to fail tasks. Oh, the irony.
391 Views