Josselin Girault
04/30/2021, 3:10 PM@task
def log_scheduled_start_time():
"""Print scheduled_start_time in logger."""
logger = prefect.context.get("logger")
scheduled_start_time = prefect.context.get("scheduled_start_time")
flow_name = prefect.context.get("flow_name")
<http://logger.info|logger.info>(f"{flow_name}: {scheduled_start_time}")
with Flow("subflow") as subflow:
log_scheduled_start_time()
subflow.register(project_name="test_schedule", labels=["test"])
with Flow("mainflow") as mainflow:
log_scheduled_start_time()
StartFlowRun(
flow_name="subflow", project_name="test_schedule", wait=True,
scheduled_start_time=prefect.context.get("scheduled_start_time")
)
mainflow.register(project_name="test_schedule", labels=["test"])
Expectation: mainflow's scheduled_start_time
is passed down to subflow, both flows log the same time.
Reality: subflow starts without a scheduled_start_time
, defaults to now()
, logged times are different.
Use case: Subflow depends on scheduled_start_time
to query some database, the agent that was supposed to run mainflow is down, mainflow and subflow are run a day later, subflow's query is then incorrect.
Bonus question: I can't seem to find documentation on how to pass parameters from one flow to another/use a flow's results as parameters for another 🙇Chris White
None
in context. To resolve this, you instead need to provide this value via a task:
with Flow("mainflow") as mainflow:
log_scheduled_start_time()
start_time = task(lambda: prefect.context.get("scheduled_start_time"))
# first initialize the task, then call it to bind it to the flow
run_task = StartFlowRun(
flow_name="subflow", project_name="test_schedule", wait=True
)(scheduled_start_time=start_time)
For passing results between Flows, there is no concept of a “Flow result” (only tasks produce results) but this is something that we are thinking about and exploring designs for right now.Josselin Girault
04/30/2021, 3:56 PM