https://prefect.io logo
d

Donnchadh McAuliffe

02/22/2022, 4:37 PM
Hi guys, I want my flow to not move onto the next task if one of my tasks is retrying.
Copy code
@task(retries=2, retry_delay_seconds=10)
def task_1(time_to_sleep: int):
    raise Exception()
    time.sleep(time_to_sleep)


@task(retries=3, retry_delay_seconds=10)
def task_2(time_to_sleep: int):
    time.sleep(time_to_sleep)


@flow(name="steps_flow_dask", task_runner=DaskTaskRunner(address={dask_address}))
def flow_to_run_tasks():
    task_1(3)
    task_2(2)
Currently when
task_1
throws the exception
task_2
immediately executes. The behaviour I want is that
task_2
doesn't execute unless
task_1
is successful. Also, if
task_1
hits its' maximum number of retries then
task_2
should not execute. Any ideas? This is on Orion.
k

Kevin Kho

02/22/2022, 4:54 PM
You can use the
wait_for
like:
Copy code
@flow(name="steps_flow_dask")
def flow_to_run_tasks():
    a = task_1(3)
    task_2(2, wait_for=[a])
let me confirm the syntax
Yes this is right
d

Donnchadh McAuliffe

02/22/2022, 5:08 PM
thank you, out of curiosity how would you get what number retry you're on inside a task?
Copy code
from prefect.context import get_run_context

@task(retries=2, retry_delay_seconds=10)
def task_1(time_to_sleep: int):
    t = get_run_context()
    t.task_run.run_count # this is always 0...
    raise Exception()
k

Kevin Kho

02/22/2022, 5:16 PM
I’ll need to ask someone and get back to you on that
z

Zanie

02/22/2022, 5:18 PM
I don’t think we’re updating the
task_run
in the context on each call, it’s the original task run object.
It’d be correct in the database if you queried, but the client is async-only right now.
d

Donnchadh McAuliffe

02/22/2022, 5:21 PM
ok thanks @Zanie - is there any easy way to get the retry number right now so?
z

Zanie

02/22/2022, 5:26 PM
Copy code
from prefect import flow, task
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.utilities.asyncio import sync_compatible


@sync_compatible
async def get_task_run(task_run_id):
    async with get_client() as client:
        return await client.read_task_run(task_run_id)


@task(retries=2, retry_delay_seconds=1)
def task_1(time_to_sleep: int):
    t = get_run_context()
    latest_task_run = get_task_run(t.task_run.id)
    print(latest_task_run.run_count)
    raise Exception()


@flow
def my_flow():
    task_1(1)


my_flow()
d

Donnchadh McAuliffe

02/23/2022, 11:40 AM
Thanks @Zanie. I think this only works if the tasks themselves are async, but correct me if I'm wrong. How would I use this if the tasks are not async?
latest_task_run
is actually a
Coroutine
object. Also, there is no
get_client
method in that orion python file, as far as I can see. See my code below:
Copy code
from prefect import flow, task
from prefect.client import OrionClient
from prefect.context import get_run_context
from prefect.utilities.asyncio import sync_compatible


@sync_compatible
async def get_task_run(task_run_id):
    async with OrionClient() as client:
        return await client.read_task_run(task_run_id)


@task(retries=2, retry_delay_seconds=1)
def task_1(time_to_sleep: int):
    t = get_run_context()
    latest_task_run = get_task_run(t.task_run.id)
    print(latest_task_run.run_count)
    raise Exception()


@flow
def my_flow():
    task_1(1)


my_flow()
z

Zanie

02/23/2022, 3:52 PM
Which version are you on?
get_client
was recently added.
Regardless, this works for me without an async task. The
sync_compatible
utility will run the async function on our event loop.
16 Views