John Shearer
11/26/2021, 12:42 PMdate
in prefect context. The docs say "an actual datetime object representing the current time".
The datetime value appears to be the same value across all tasks within a flow- so I assuem this is actually the start time of the flow? This behaviour is what I want, but want to confirm my assumption.John Shearer
11/26/2021, 12:43 PMprefect.context.get("date")
, then sleeps for 1.1 seconds)
test.run()
[2021-11-26 12:41:53+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-11-26 12:41:53+0000] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-11-26 12:41:53+0000] DEBUG - prefect.FlowRunner | Flow 'test': Handling state change from Scheduled to Running
[2021-11-26 12:41:53+0000] INFO - prefect.TaskRunner | Task 'thing': Starting task run...
[2021-11-26 12:41:53+0000] DEBUG - prefect.TaskRunner | Task 'thing': Handling state change from Pending to Running
[2021-11-26 12:41:53+0000] DEBUG - prefect.TaskRunner | Task 'thing': Calling task.run() method...
prefect.context.get("date"): 2021-11-26T12:41:53.398890+00:00
[2021-11-26 12:41:54+0000] DEBUG - prefect.TaskRunner | Task 'thing': Handling state change from Running to Success
[2021-11-26 12:41:54+0000] INFO - prefect.TaskRunner | Task 'thing': Finished task run for task with final state: 'Success'
[2021-11-26 12:41:54+0000] INFO - prefect.TaskRunner | Task 'thing': Starting task run...
[2021-11-26 12:41:54+0000] DEBUG - prefect.TaskRunner | Task 'thing': Handling state change from Pending to Running
[2021-11-26 12:41:54+0000] DEBUG - prefect.TaskRunner | Task 'thing': Calling task.run() method...
prefect.context.get("date"): 2021-11-26T12:41:53.398890+00:00
[2021-11-26 12:41:55+0000] DEBUG - prefect.TaskRunner | Task 'thing': Handling state change from Running to Success
[2021-11-26 12:41:55+0000] INFO - prefect.TaskRunner | Task 'thing': Finished task run for task with final state: 'Success'
[2021-11-26 12:41:55+0000] INFO - prefect.TaskRunner | Task 'thing': Starting task run...
[2021-11-26 12:41:55+0000] DEBUG - prefect.TaskRunner | Task 'thing': Handling state change from Pending to Running
[2021-11-26 12:41:55+0000] DEBUG - prefect.TaskRunner | Task 'thing': Calling task.run() method...
prefect.context.get("date"): 2021-11-26T12:41:53.398890+00:00
[2021-11-26 12:41:56+0000] DEBUG - prefect.TaskRunner | Task 'thing': Handling state change from Running to Success
[2021-11-26 12:41:56+0000] INFO - prefect.TaskRunner | Task 'thing': Finished task run for task with final state: 'Success'
[2021-11-26 12:41:56+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-11-26 12:41:56+0000] DEBUG - prefect.FlowRunner | Flow 'test': Handling state change from Running to Success
Out[23]: <Success: "All reference tasks succeeded.">
Anna Geller
import prefect
from prefect import task, Flow
import time
@task(log_stdout=True)
def get_date_and_scheduled_start_time():
print(prefect.context.get("date"))
print(prefect.context.get("scheduled_start_time"))
time.sleep(2)
with Flow("context") as flow:
t1 = get_date_and_scheduled_start_time()
t2 = get_date_and_scheduled_start_time()
t3 = get_date_and_scheduled_start_time()
t4 = get_date_and_scheduled_start_time()
t1.set_downstream(t2)
t2.set_downstream(t3)
t3.set_downstream(t4)
And Cloud logs:Anna Geller
John Shearer
11/26/2021, 1:09 PMJohn Shearer
11/26/2021, 1:09 PMAnna Geller