Hello! I have encountered a very weird behavior an...
# ask-community
a
Hello! I have encountered a very weird behavior and have no more ideas what could cause it. We do batch processing and use
create_flow_run
with
map
to create multiple flows, each with a different dict of parameters. On one server, all created flows receive the same
flow_run_id
, which means they overwrite their logs and we only see one in Prefect UI. (Locally I cannot reproduce it and every child flow has a different flow run ID. This server is running in docker, and in that setup
create_flow_run
was working correctly previously.) Does anyone have ideas? (Example code attached)
I could reduce the code to this: (Set backend to server, start a prefect server and agent.)
Copy code
from typing import List

from prefect import Client, Flow, Parameter, task, unmapped
from prefect.executors import LocalExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


@task(log_stdout=True)
def print_flow_details(dataset):
    import prefect

    print(f"prefect.context.flow_id: {prefect.context.flow_id}")
    print(f"prefect.context.flow_run_id: {prefect.context.flow_run_id}")
    print(f"dataset: {dataset}")


@task(log_stdout=True)
def assert_flow_run_ids_unique(flow_run_ids: List[str]):
    print(f"flow_run_ids: {flow_run_ids}")
    if len(set(flow_run_ids)) != len(flow_run_ids):
        raise RuntimeError(f"Two or more flow runs have same IDs: {flow_run_ids}")


with Flow("child_flow") as child_flow:
    dataset = Parameter("dataset")
    print_flow_details(dataset)


with Flow("batch_flow") as batch_flow:
    parameters = Parameter("parameters")
    child_run_ids = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("child_flow"),
    )
    assert_flow_run_ids_unique(child_run_ids)
    # flow_runs_finished = wait_for_flow_run.map(flow_run_id=child_run_ids)


def run_batch_flow(datasets=["dataset1", "dataset2"]):
    project_name = "default"
    client = Client()
    client.create_project(project_name=project_name)
    batch_flow.register(project_name=project_name)
    child_flow.register(project_name=project_name)

    state = batch_flow.run(
        executor=LocalExecutor(),
        parameters={
            "parameters": [{"dataset": d} for d in datasets],
        },
    )
    assert state.is_successful()


if __name__ == "__main__":
    run_batch_flow()
k
I think you might need to supply different idempotency keys otherwise they use the default
task_run_id
and repeat the idempotency keys, causing all of them to just trigger the same flow run.
1
a
Can you share what do you use as a value for “parameters”? you don’t have any default values which prevents this flow from being scheduled
a
I filled in (very minimal) parameters as default in the function
run_batch_flow
that starts the flow.
👍 1
Thanks, I didn't know about idempotency keys. I'm wondering why that didn't cause a problem locally.
That solved it, thanks!
🙌 1