https://prefect.io logo
k

Klemen Strojan

09/16/2020, 7:18 AM
Hey all, We have a master flow that looks like this:
Copy code
# schedule
daily_schedule = CronSchedule("0 7 * * *", start_date=pendulum.now(tz="Europe/Vienna"))

# result
RESULT = AzureResult('prefect', connection_string_secret='my_azure_secret')

with Flow("master_flow", schedule=daily_schedule, result=RESULT) as flow:
    flow_1_param_1 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '1'})
    flow_1_param_2 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '2'})
    flow_1_param_3 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '3'})
    flow_2 = FlowRunTask(
        flow_name="flow_2",
        project_name="project_1",
        wait=True)
    flow_3 = FlowRunTask( 
        flow_name="flow_3",
        project_name="project_1",
        wait=True)
    flow_1_param_1.set_downstream(flow_1_param_2)
    flow_1_param_2.set_downstream(flow_1_param_3)
    flow_1_param_3.set_downstream(flow_2)
    flow_2.set_downstream(flow_3)

###############################################################################
# Settings

flow.storage = GitHub(
    repo="some_repo",
    path="some_path",
    secrets=["GITHUB_ACCESS_TOKEN"]
)

# Labels for identifying agents
flow.environment = LocalEnvironment(labels=["label_1", "label_2"])
We are using Cloud and have two agents with local Dask clusters. Labels from the master flow points towards one agent and labels from all other flows point towards the second agent. When we run the master flow, everything runs normally, no errors or warnings in the log. But in reality, only
flow_1_param_1
,
flow_2
, and
flow_3
ran. What are we missing?
n

nicholas

09/16/2020, 7:34 AM
Hi @Klemen Strojan - first off: really cool flow! Exciting to see the evolution of a meta-type flow like this. To answer your question, I think this has to do with the
idempotency_key
on the
FlowRunTask
. I believe that's populated with the flow run id available in the context from which it's called, meaning the 3 tasks that create runs for
flow_1
will be passing the same
idempotency_key
. My suggestion would be to try defining that explicitly to something like
1
,
2
, and
3
appended to the flow run id
k

Klemen Strojan

09/16/2020, 9:05 AM
Thanks @nicholas! I am checking the source code (https://github.com/PrefectHQ/prefect/blob/2554a489172fb3e81f9c04188221322582bba078/src/prefect/tasks/prefect/flow_run.py#L11) and I am not sure how to change the flow run id. I tried
Copy code
flow_1_param_1 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '1'})
flow_1_param_2 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '2'},
        new_flow_context={'flow_run_id': str(flow_1_param_1) + '1'})
but this has the same behaviour as the previous setting. If I call
flow_run_id
from
context
, I’ll get the one for my master flow, which doesn’t matter in this case.
n

nicholas

09/16/2020, 5:16 PM
Try creating a wrapper task that can pass in the flow run context like this:
Copy code
@task
flow_1(index: int, parameters: obj):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters,
        idempotency_key=f"{prefect.context.flow_run_id}__{index}"
  )
Which you can call from your flow context like this:
Copy code
flow_1_param_1 = flow_1(
        index=1
        parameters={'param': '1'}
)

flow_1_param_2 = flow_1(
        index=2
        parameters={'param': '2'}
)

flow_1_param_3 = flow_1(
        index=3
        parameters={'param': '3'}
)
k

Klemen Strojan

09/17/2020, 7:16 AM
Creating a wrapper didn’t do the trick, I got
TypeError: __init__() got an unexpected keyword argument 'idempotency_key'
straight away. I don’t see any other obvious way to pass a value to
idempotency_key
.
Copy code
@task
def flow_1(index: int, parameters: dict):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters,
        idempotency_key=f"{prefect.context.flow_run_id}__{index}"
  )

flow_1_param_1 = flow_1(
        index=1,
        parameters={'param': '1'}
    )

flow_1_param_2 = flow_1(
        index=2,
        parameters={'param': '2'}
n

nicholas

09/17/2020, 2:06 PM
Ah sorry @Klemen Strojan - I wasn't looking closely enough at the code, you'll want to pass
idempotency_key
to the run method, not the base class:
Copy code
def flow_1(index: int, parameters: dict):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters
  )(idempotency_key=f"{prefect.context.flow_run_id}__{index}")
👍 1
k

Klemen Strojan

09/18/2020, 8:07 AM
Thanks @nicholas, that did the trick (with a small modification pasted below). I appreciate your help! 🙏 More meta-type flows to come 🧐🙂
Copy code
def flow_1(index: int, parameters: dict):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters
  ).run(idempotency_key=f"{prefect.context.flow_run_id}__{index}")
n

nicholas

09/18/2020, 4:16 PM
Awesome!! Glad you got it working @Klemen Strojan 😄