return create_flow_run(…) under a task, why it gi...
# prefect-community
c
return create_flow_run(…) under a task, why it gives me this :
Copy code
aise TypeError("`fn` must be callable")
TypeError: `fn` must be callable
j
Could I get more context? Are you running
create_flow_run()
within a task?
c
Sure! We have something like:
Copy code
@task('call currency orchestrator')
def call_currency_orchestrator():

    return create_flow_run(
        flow_name = "currency_orchestrator",
        project_name = "xxx",
    )
I can provide you with the whole picture:
Copy code
@task('call currency orchestrator')
def call_currency_orchestrator():

    return create_flow_run(
        flow_name = "currency_orchestrator",
        project_name = "xxx",
    )

@task('call data model orchestrator', trigger = all_finished)
def call_data_model_orchestrator(params_data_model_orchestrator):

    return create_flow_run.map(
        flow_name = unmapped("data_model_orchestrator"),
        project_name = unmapped("xxx"),
        parameters = params_data_model_orchestrator,
    )

with Flow(
    name="super_orchestrator",
    executor=LocalDaskExecutor(scheduler="threads", num_workers=2)
) as flow:
    
    currency_orchestrator_flow_id = call_currency_orchestrator()

    wait_for_currency_orchestrator_flow = wait_for_flow_run(
        currency_orchestrator_flow_id,
        max_duration=datetime.timedelta(seconds=43200),
        stream_states=True,
        stream_logs=True,
        raise_final_state=True,
    )

    data_model_flow_id = call_data_model_orchestrator(params_data_model_orchestrator)

    data_model_flow_id.set_upstream(wait_for_currency_orchestrator_flow)

    wait_for_flow_run.map(
        data_model_flow_id,
        max_duration=unmapped(datetime.timedelta(seconds=43200)),
        stream_states=unmapped(True),
        stream_logs=unmapped(True),
        raise_final_state=unmapped(True),
    )
we are using flow of flow of flows (yes, we have a grandparent flow)… my question is how can I fix the error, I know task should not be called from a task, but we use return I think that would be ok… since this can work:
Copy code
def dbt_run(…):
  return ShellTask(…).run(command=command)
👀 1
j
Hi @Chu, as you mentioned, the flow will typically fail when you run a task within another task. To accomplish something similar with the create_flow_run task, you can register those dependent flows within the same project and then call that task within the parent flow context. https://docs-v1.prefect.io/core/idioms/flow-to-flow.html
c
so, my question comes: if we register them in Prefect Cloud, currency_orchestrator is a upstream task for data_model_orchestrator flow, but data_model_orchestrator is using mapping… I can’t specify
task_args = unmapped(dict(trigger = all_finished))
inside create_flow_run.map(), it throw me an error said task_arg() can’t be copied…, how can I set the trigger between one flow and another mapped flow…