Andreas Eisenbarth
01/17/2022, 7:27 PMcreate_flow_run
with map
to create multiple flows, each with a different dict of parameters. On one server, all created flows receive the same flow_run_id
, which means they overwrite their logs and we only see one in Prefect UI.
(Locally I cannot reproduce it and every child flow has a different flow run ID. This server is running in docker, and in that setup create_flow_run
was working correctly previously.)
Does anyone have ideas? (Example code attached)Andreas Eisenbarth
01/17/2022, 7:29 PMfrom typing import List
from prefect import Client, Flow, Parameter, task, unmapped
from prefect.executors import LocalExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
@task(log_stdout=True)
def print_flow_details(dataset):
import prefect
print(f"prefect.context.flow_id: {prefect.context.flow_id}")
print(f"prefect.context.flow_run_id: {prefect.context.flow_run_id}")
print(f"dataset: {dataset}")
@task(log_stdout=True)
def assert_flow_run_ids_unique(flow_run_ids: List[str]):
print(f"flow_run_ids: {flow_run_ids}")
if len(set(flow_run_ids)) != len(flow_run_ids):
raise RuntimeError(f"Two or more flow runs have same IDs: {flow_run_ids}")
with Flow("child_flow") as child_flow:
dataset = Parameter("dataset")
print_flow_details(dataset)
with Flow("batch_flow") as batch_flow:
parameters = Parameter("parameters")
child_run_ids = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("child_flow"),
)
assert_flow_run_ids_unique(child_run_ids)
# flow_runs_finished = wait_for_flow_run.map(flow_run_id=child_run_ids)
def run_batch_flow(datasets=["dataset1", "dataset2"]):
project_name = "default"
client = Client()
client.create_project(project_name=project_name)
batch_flow.register(project_name=project_name)
child_flow.register(project_name=project_name)
state = batch_flow.run(
executor=LocalExecutor(),
parameters={
"parameters": [{"dataset": d} for d in datasets],
},
)
assert state.is_successful()
if __name__ == "__main__":
run_batch_flow()
Kevin Kho
task_run_id
and repeat the idempotency keys, causing all of them to just trigger the same flow run.Anna Geller
Andreas Eisenbarth
01/17/2022, 7:41 PMrun_batch_flow
that starts the flow.Andreas Eisenbarth
01/17/2022, 7:42 PMAndreas Eisenbarth
01/17/2022, 7:44 PM