Chu
07/28/2022, 9:13 PMaise TypeError("`fn` must be callable")
TypeError: `fn` must be callable
James Sopkin
07/28/2022, 9:24 PMcreate_flow_run()
within a task?Chu
07/28/2022, 9:28 PM@task('call currency orchestrator')
def call_currency_orchestrator():
return create_flow_run(
flow_name = "currency_orchestrator",
project_name = "xxx",
)
@task('call currency orchestrator')
def call_currency_orchestrator():
return create_flow_run(
flow_name = "currency_orchestrator",
project_name = "xxx",
)
@task('call data model orchestrator', trigger = all_finished)
def call_data_model_orchestrator(params_data_model_orchestrator):
return create_flow_run.map(
flow_name = unmapped("data_model_orchestrator"),
project_name = unmapped("xxx"),
parameters = params_data_model_orchestrator,
)
with Flow(
name="super_orchestrator",
executor=LocalDaskExecutor(scheduler="threads", num_workers=2)
) as flow:
currency_orchestrator_flow_id = call_currency_orchestrator()
wait_for_currency_orchestrator_flow = wait_for_flow_run(
currency_orchestrator_flow_id,
max_duration=datetime.timedelta(seconds=43200),
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
data_model_flow_id = call_data_model_orchestrator(params_data_model_orchestrator)
data_model_flow_id.set_upstream(wait_for_currency_orchestrator_flow)
wait_for_flow_run.map(
data_model_flow_id,
max_duration=unmapped(datetime.timedelta(seconds=43200)),
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)
def dbt_run(…):
return ShellTask(…).run(command=command)
James Sopkin
07/29/2022, 1:47 PMChu
07/29/2022, 1:59 PMtask_args = unmapped(dict(trigger = all_finished))
inside create_flow_run.map(), it throw me an error said task_arg() can’t be copied…, how can I set the trigger between one flow and another mapped flow…