Donnchadh McAuliffe
02/22/2022, 4:37 PM@task(retries=2, retry_delay_seconds=10)
def task_1(time_to_sleep: int):
raise Exception()
time.sleep(time_to_sleep)
@task(retries=3, retry_delay_seconds=10)
def task_2(time_to_sleep: int):
time.sleep(time_to_sleep)
@flow(name="steps_flow_dask", task_runner=DaskTaskRunner(address={dask_address}))
def flow_to_run_tasks():
task_1(3)
task_2(2)
Currently when task_1
throws the exception task_2
immediately executes. The behaviour I want is that task_2
doesn't execute unless task_1
is successful. Also, if task_1
hits its' maximum number of retries then task_2
should not execute.
Any ideas? This is on Orion.Kevin Kho
wait_for
like:
@flow(name="steps_flow_dask")
def flow_to_run_tasks():
a = task_1(3)
task_2(2, wait_for=[a])
let me confirm the syntaxDonnchadh McAuliffe
02/22/2022, 5:08 PMfrom prefect.context import get_run_context
@task(retries=2, retry_delay_seconds=10)
def task_1(time_to_sleep: int):
t = get_run_context()
t.task_run.run_count # this is always 0...
raise Exception()
Kevin Kho
Zanie
task_run
in the context on each call, it’s the original task run object.Donnchadh McAuliffe
02/22/2022, 5:21 PMZanie
from prefect import flow, task
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.utilities.asyncio import sync_compatible
@sync_compatible
async def get_task_run(task_run_id):
async with get_client() as client:
return await client.read_task_run(task_run_id)
@task(retries=2, retry_delay_seconds=1)
def task_1(time_to_sleep: int):
t = get_run_context()
latest_task_run = get_task_run(t.task_run.id)
print(latest_task_run.run_count)
raise Exception()
@flow
def my_flow():
task_1(1)
my_flow()
Donnchadh McAuliffe
02/23/2022, 11:40 AMlatest_task_run
is actually a Coroutine
object.
Also, there is no get_client
method in that orion python file, as far as I can see. See my code below:
from prefect import flow, task
from prefect.client import OrionClient
from prefect.context import get_run_context
from prefect.utilities.asyncio import sync_compatible
@sync_compatible
async def get_task_run(task_run_id):
async with OrionClient() as client:
return await client.read_task_run(task_run_id)
@task(retries=2, retry_delay_seconds=1)
def task_1(time_to_sleep: int):
t = get_run_context()
latest_task_run = get_task_run(t.task_run.id)
print(latest_task_run.run_count)
raise Exception()
@flow
def my_flow():
task_1(1)
my_flow()
Zanie
get_client
was recently added.sync_compatible
utility will run the async function on our event loop.