Kelby
05/04/2021, 1:51 PMKelby
05/04/2021, 1:52 PMKelby
05/04/2021, 1:52 PMimport datetime
import prefect
from prefect import Flow, task
from prefect.engine.signals import RETRY
@task
def myTask():
print(prefect.context.get("myId"))
if prefect.context.task_run_count < 3:
raise RETRY(
"Retrying",
start_time=datetime.datetime.utcnow() + datetime.timedelta(seconds=1),
context={"myId": 1},
)
with Flow("MyFlow") as flow:
myTask()
flow.run()
Kelby
05/04/2021, 1:52 PM2021-05-04 09:49:13-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'MyFlow'
[2021-05-04 09:49:13-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-05-04 09:49:13-0400] DEBUG - prefect.FlowRunner | Flow 'MyFlow': Handling state change from Scheduled to Running
[2021-05-04 09:49:14-0400] INFO - prefect.TaskRunner | Task 'myTask': Starting task run...
[2021-05-04 09:49:14-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Handling state change from Pending to Running
[2021-05-04 09:49:14-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Calling task.run() method...
None
[2021-05-04 09:49:14-0400] INFO - prefect.TaskRunner | RETRY signal raised: RETRY('Retrying')
[2021-05-04 09:49:14-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Handling state change from Running to Retrying
[2021-05-04 09:49:14-0400] INFO - prefect.TaskRunner | Task 'myTask': Finished task run for task with final state: 'Retrying'
[2021-05-04 09:49:14-0400] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2021-05-04 09:49:14-0400] INFO - prefect.MyFlow | Waiting for next available Task run at 2021-05-04T13:49:15.011585+00:00
[2021-05-04 09:49:15-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'MyFlow'
[2021-05-04 09:49:15-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-05-04 09:49:15-0400] INFO - prefect.TaskRunner | Task 'myTask': Starting task run...
[2021-05-04 09:49:15-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Handling state change from Retrying to Running
[2021-05-04 09:49:15-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Calling task.run() method...
None
[2021-05-04 09:49:15-0400] INFO - prefect.TaskRunner | RETRY signal raised: RETRY('Retrying')
[2021-05-04 09:49:15-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Handling state change from Running to Retrying
[2021-05-04 09:49:15-0400] INFO - prefect.TaskRunner | Task 'myTask': Finished task run for task with final state: 'Retrying'
[2021-05-04 09:49:15-0400] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2021-05-04 09:49:15-0400] INFO - prefect.MyFlow | Waiting for next available Task run at 2021-05-04T13:49:16.049764+00:00
[2021-05-04 09:49:16-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'MyFlow'
[2021-05-04 09:49:16-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-05-04 09:49:16-0400] INFO - prefect.TaskRunner | Task 'myTask': Starting task run...
[2021-05-04 09:49:16-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Handling state change from Retrying to Running
[2021-05-04 09:49:16-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Calling task.run() method...
None
[2021-05-04 09:49:16-0400] DEBUG - prefect.TaskRunner | Task 'myTask': Handling state change from Running to Success
[2021-05-04 09:49:16-0400] INFO - prefect.TaskRunner | Task 'myTask': Finished task run for task with final state: 'Success'
[2021-05-04 09:49:16-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-05-04 09:49:16-0400] DEBUG - prefect.FlowRunner | Flow 'MyFlow': Handling state change from Running to Success
Zach Angell
Zach Angell
Marvin
05/04/2021, 2:21 PM