Emil Ostergaard
01/18/2023, 10:27 AMimport datetime as dt
import asyncio
from prefect import task, flow
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=dt.timedelta(minutes=1))
async def get_token(key):
return key
async def build_subflow(n):
@flow(retries=1, retry_delay_seconds=61)
async def subflow(id):
token = await get_token(dt.datetime.now().strftime('%Y-%m-%d %H:%M'))
print(f'Id: {id}-{token}')
raise Exception('ERROR')
await subflow(n)
@flow(log_prints=False)
async def main_flow():
await asyncio.create_task(build_subflow(1))
if __name__ == '__main__':
main_flow_state = asyncio.run(main_flow())
Current behavior:
10:50:05.123 | INFO | prefect.engine - Created flow run 'magnificent-partridge' for flow 'main-flow'
10:50:07.225 | INFO | Flow run 'magnificent-partridge' - Created subflow run 'keen-galago' for flow 'subflow'
10:50:07.868 | INFO | Flow run 'keen-galago' - Created task run 'get_token-5531366f-0' for task 'get_token'
10:50:07.869 | INFO | Flow run 'keen-galago' - Executing 'get_token-5531366f-0' immediately...
10:50:08.738 | INFO | Task run 'get_token-5531366f-0' - Finished in state Completed()
10:50:08.740 | ERROR | Flow run 'keen-galago' - Encountered exception during execution:
...
Id: 1-2023-01-18 10:50
10:50:08.995 | INFO | Flow run 'keen-galago' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
10:51:10.832 | INFO | Flow run 'keen-galago' - Created task run 'get_token-5531366f-0' for task 'get_token'
10:51:10.833 | INFO | Flow run 'keen-galago' - Executing 'get_token-5531366f-0' immediately...
10:51:11.440 | INFO | Task run 'get_token-5531366f-0' - Task run '53b5da1b-e807-4db2-8404-633b88c94c8d' already finished.
Id: 1-2023-01-18 10:50
10:51:11.840 | ERROR | Flow run 'keen-galago' - Encountered exception during execution:
...
So basically I see this behavior because Task run 'get_token-5531366f-0' - Task run '53b5da1b-e807-4db2-8404-633b88c94c8d' already finished
- any way to force a re-run?
Thanks in advance!