Thread
#prefect-community
    a

    Andreas Eisenbarth

    8 months ago
    Hello! I have encountered a very weird behavior and have no more ideas what could cause it. We do batch processing and use
    create_flow_run
    with
    map
    to create multiple flows, each with a different dict of parameters. On one server, all created flows receive the same
    flow_run_id
    , which means they overwrite their logs and we only see one in Prefect UI. (Locally I cannot reproduce it and every child flow has a different flow run ID. This server is running in docker, and in that setup
    create_flow_run
    was working correctly previously.) Does anyone have ideas? (Example code attached)
    I could reduce the code to this: (Set backend to server, start a prefect server and agent.)
    from typing import List
    
    from prefect import Client, Flow, Parameter, task, unmapped
    from prefect.executors import LocalExecutor
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    
    
    @task(log_stdout=True)
    def print_flow_details(dataset):
        import prefect
    
        print(f"prefect.context.flow_id: {prefect.context.flow_id}")
        print(f"prefect.context.flow_run_id: {prefect.context.flow_run_id}")
        print(f"dataset: {dataset}")
    
    
    @task(log_stdout=True)
    def assert_flow_run_ids_unique(flow_run_ids: List[str]):
        print(f"flow_run_ids: {flow_run_ids}")
        if len(set(flow_run_ids)) != len(flow_run_ids):
            raise RuntimeError(f"Two or more flow runs have same IDs: {flow_run_ids}")
    
    
    with Flow("child_flow") as child_flow:
        dataset = Parameter("dataset")
        print_flow_details(dataset)
    
    
    with Flow("batch_flow") as batch_flow:
        parameters = Parameter("parameters")
        child_run_ids = create_flow_run.map(
            parameters=parameters,
            flow_name=unmapped("child_flow"),
        )
        assert_flow_run_ids_unique(child_run_ids)
        # flow_runs_finished = wait_for_flow_run.map(flow_run_id=child_run_ids)
    
    
    def run_batch_flow(datasets=["dataset1", "dataset2"]):
        project_name = "default"
        client = Client()
        client.create_project(project_name=project_name)
        batch_flow.register(project_name=project_name)
        child_flow.register(project_name=project_name)
    
        state = batch_flow.run(
            executor=LocalExecutor(),
            parameters={
                "parameters": [{"dataset": d} for d in datasets],
            },
        )
        assert state.is_successful()
    
    
    if __name__ == "__main__":
        run_batch_flow()
    Kevin Kho

    Kevin Kho

    8 months ago
    I think you might need to supply different idempotency keys otherwise they use the default
    task_run_id
    and repeat the idempotency keys, causing all of them to just trigger the same flow run.
    Anna Geller

    Anna Geller

    8 months ago
    Can you share what do you use as a value for “parameters”? you don’t have any default values which prevents this flow from being scheduled
    a

    Andreas Eisenbarth

    8 months ago
    I filled in (very minimal) parameters as default in the function
    run_batch_flow
    that starts the flow.
    Thanks, I didn't know about idempotency keys. I'm wondering why that didn't cause a problem locally.
    That solved it, thanks!