Hi! I'm running a flow that run two subflows. One ...
# prefect-community
b
Hi! I'm running a flow that run two subflows. One subflow execute a task that uses
run_deployment
for execute two deployments that has to be completed the next subflow. I'm having issues in the execution of the seconds subflows, the agent dies after one hour of waiting. How can I apply this logic successfully? I need to execute n deployments and when the n deployments has been completed, execute another flow. Image 1: The deployments runs properly. Image 2: The tasks that runs the deployments never ends.
I have this in my code.
Copy code
@task
def trigger_EL_flow(deployment_name) -> FlowRun:
    # this will wait for completion of "XtraIngestion"
    #flow_run_model_deployed = []

    #for deployment_name in deployments:
    flow_run_model = run_deployment(name=f"XtraIngestion/{deployment_name}")
    #    flow_run_model_deployed.append(flow_run_model)

    if not flow_run_model.state.name == "Completed":
        raise Failed() # or respond to failure however you want
    return flow_run_model

@flow(name="XtraIngestion")
def xtra_sensing_extract_load(s3_bucket="xtra-data-ingestion",s3_prefix="a071/",s3_filename="A071_20221001.csv",incremental=True,airbyte_connection_id="8ee2883c-1256-46df-bac7-4c90a789849e"):
    
    s3_sensor(
        s3_bucket=s3_bucket,
        s3_prefix=s3_prefix,
        s3_filename=s3_filename,
        incremental=incremental
    )

    trigger_sync(
        poll_interval_s=45,
        status_updates=True,
        connection_id=airbyte_connection_id,
        timeout=300,
    )

@flow(name="XtraTransformation")
def xtra_dbt(dbt_job_id=111111):
    dbt_api_key = Secret.load("dbt-cloud")
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key=dbt_api_key.get(),
            account_id=1762
        ),
        max_wait_seconds=3000,
        job_id=dbt_job_id
    )

@flow(name="XtraELTflow")
def xtra_elt_flow(deployments_name=["EKPO","EKKO"],dbt_job_id:int = 11111):
    subflow_params = [x for x in deployments_name]

    EL_flow_results = trigger_EL_flow.map(subflow_params)

    if all(i.result() for i in EL_flow_results):
        # trigger dbt, whether in cloud or CLI
        xtra_dbt(dbt_job_id)
k
I’m trying to reproduce the issue that you are experiencing. This code works for me:
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment


@task
def run_depl(depl1: str, depl2):
    run_deployment(depl1)
    run_deployment(depl2)


@flow
def subflow():
    ...


@flow
def main(depl1: str, depl2: str):
    run_depl(depl1, depl2)
    subflow()


if __name__ == "__main__":
    main(depl1="test-flow/test-flow", depl2="test-flow/test-flow")
Can you run this code successfully?
👀 1
b
Are you using prefect 2.7? @Khuyen Tran I upgrade yesterday, today it has been working I have not the error, is still running.
k
I’m using Prefect 2.6.9. Maybe you can downgrade to Prefect 2.6.9 to see if the problem persists?
I just tested with 2.7 and it ran successfully
so you said you ran the same code as the one I posted and it hangs?
b
Yeah! I today it run successfully! 🎉 @Khuyen Tran
🎉 1
k
That’s awesome to hear!