https://prefect.io logo
Title
e

Emil Ostergaard

01/18/2023, 10:27 AM
Hey everyone. Is there a way to way I can force a rerun of a task that completes within in a sub-flow that fails and will rerun? In the example, which is a simplification of my work case, I would like to the function get_token to re-run for each time a sub-flow re-runs (if “key” has changed). In this case I would like to see Id: 1-2023-01-18 10:50 Id: 1-2023-01-18 10:51 (and sub-flows still failing)
import datetime as dt
import asyncio
from prefect import task, flow
from prefect.tasks import task_input_hash


@task(cache_key_fn=task_input_hash, cache_expiration=dt.timedelta(minutes=1))
async def get_token(key):
    return key


async def build_subflow(n):
    @flow(retries=1, retry_delay_seconds=61)
    async def subflow(id):
        token = await get_token(dt.datetime.now().strftime('%Y-%m-%d %H:%M'))
        print(f'Id: {id}-{token}')
        raise Exception('ERROR')

    await subflow(n)


@flow(log_prints=False)
async def main_flow():
    await asyncio.create_task(build_subflow(1))


if __name__ == '__main__':
    main_flow_state = asyncio.run(main_flow())
Current behavior:
10:50:05.123 | INFO    | prefect.engine - Created flow run 'magnificent-partridge' for flow 'main-flow'
10:50:07.225 | INFO    | Flow run 'magnificent-partridge' - Created subflow run 'keen-galago' for flow 'subflow'
10:50:07.868 | INFO    | Flow run 'keen-galago' - Created task run 'get_token-5531366f-0' for task 'get_token'
10:50:07.869 | INFO    | Flow run 'keen-galago' - Executing 'get_token-5531366f-0' immediately...
10:50:08.738 | INFO    | Task run 'get_token-5531366f-0' - Finished in state Completed()
10:50:08.740 | ERROR   | Flow run 'keen-galago' - Encountered exception during execution:
...

Id: 1-2023-01-18 10:50
10:50:08.995 | INFO    | Flow run 'keen-galago' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
10:51:10.832 | INFO    | Flow run 'keen-galago' - Created task run 'get_token-5531366f-0' for task 'get_token'
10:51:10.833 | INFO    | Flow run 'keen-galago' - Executing 'get_token-5531366f-0' immediately...
10:51:11.440 | INFO    | Task run 'get_token-5531366f-0' - Task run '53b5da1b-e807-4db2-8404-633b88c94c8d' already finished.
Id: 1-2023-01-18 10:50
10:51:11.840 | ERROR   | Flow run 'keen-galago' - Encountered exception during execution:
...
So basically I see this behavior because
Task run 'get_token-5531366f-0' - Task run '53b5da1b-e807-4db2-8404-633b88c94c8d' already finished
- any way to force a re-run? Thanks in advance!