Klemen Strojan
09/16/2020, 7:18 AM# schedule
daily_schedule = CronSchedule("0 7 * * *", start_date=pendulum.now(tz="Europe/Vienna"))
# result
RESULT = AzureResult('prefect', connection_string_secret='my_azure_secret')
with Flow("master_flow", schedule=daily_schedule, result=RESULT) as flow:
flow_1_param_1 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '1'})
flow_1_param_2 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '2'})
flow_1_param_3 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '3'})
flow_2 = FlowRunTask(
flow_name="flow_2",
project_name="project_1",
wait=True)
flow_3 = FlowRunTask(
flow_name="flow_3",
project_name="project_1",
wait=True)
flow_1_param_1.set_downstream(flow_1_param_2)
flow_1_param_2.set_downstream(flow_1_param_3)
flow_1_param_3.set_downstream(flow_2)
flow_2.set_downstream(flow_3)
###############################################################################
# Settings
flow.storage = GitHub(
repo="some_repo",
path="some_path",
secrets=["GITHUB_ACCESS_TOKEN"]
)
# Labels for identifying agents
flow.environment = LocalEnvironment(labels=["label_1", "label_2"])
We are using Cloud and have two agents with local Dask clusters. Labels from the master flow points towards one agent and labels from all other flows point towards the second agent. When we run the master flow, everything runs normally, no errors or warnings in the log. But in reality, only flow_1_param_1
, flow_2
, and flow_3
ran. What are we missing?nicholas
idempotency_key
on the FlowRunTask
. I believe that's populated with the flow run id available in the context from which it's called, meaning the 3 tasks that create runs for flow_1
will be passing the same idempotency_key
. My suggestion would be to try defining that explicitly to something like 1
, 2
, and 3
appended to the flow run idKlemen Strojan
09/16/2020, 9:05 AMflow_1_param_1 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '1'})
flow_1_param_2 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '2'},
new_flow_context={'flow_run_id': str(flow_1_param_1) + '1'})
but this has the same behaviour as the previous setting. If I call flow_run_id
from context
, I’ll get the one for my master flow, which doesn’t matter in this case.nicholas
@task
flow_1(index: int, parameters: obj):
return FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters=parameters,
idempotency_key=f"{prefect.context.flow_run_id}__{index}"
)
flow_1_param_1 = flow_1(
index=1
parameters={'param': '1'}
)
flow_1_param_2 = flow_1(
index=2
parameters={'param': '2'}
)
flow_1_param_3 = flow_1(
index=3
parameters={'param': '3'}
)
Klemen Strojan
09/17/2020, 7:16 AMTypeError: __init__() got an unexpected keyword argument 'idempotency_key'
straight away. I don’t see any other obvious way to pass a value to idempotency_key
.
@task
def flow_1(index: int, parameters: dict):
return FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters=parameters,
idempotency_key=f"{prefect.context.flow_run_id}__{index}"
)
flow_1_param_1 = flow_1(
index=1,
parameters={'param': '1'}
)
flow_1_param_2 = flow_1(
index=2,
parameters={'param': '2'}
nicholas
idempotency_key
to the run method, not the base class:
def flow_1(index: int, parameters: dict):
return FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters=parameters
)(idempotency_key=f"{prefect.context.flow_run_id}__{index}")
Klemen Strojan
09/18/2020, 8:07 AMdef flow_1(index: int, parameters: dict):
return FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters=parameters
).run(idempotency_key=f"{prefect.context.flow_run_id}__{index}")
nicholas