Boggdan Barrientos
11/30/2022, 10:07 PMrun_deployment
for execute two deployments that has to be completed the next subflow.
I'm having issues in the execution of the seconds subflows, the agent dies after one hour of waiting.
How can I apply this logic successfully? I need to execute n deployments and when the n deployments has been completed, execute another flow.
Image 1: The deployments runs properly.
Image 2: The tasks that runs the deployments never ends.@task
def trigger_EL_flow(deployment_name) -> FlowRun:
# this will wait for completion of "XtraIngestion"
#flow_run_model_deployed = []
#for deployment_name in deployments:
flow_run_model = run_deployment(name=f"XtraIngestion/{deployment_name}")
# flow_run_model_deployed.append(flow_run_model)
if not flow_run_model.state.name == "Completed":
raise Failed() # or respond to failure however you want
return flow_run_model
@flow(name="XtraIngestion")
def xtra_sensing_extract_load(s3_bucket="xtra-data-ingestion",s3_prefix="a071/",s3_filename="A071_20221001.csv",incremental=True,airbyte_connection_id="8ee2883c-1256-46df-bac7-4c90a789849e"):
s3_sensor(
s3_bucket=s3_bucket,
s3_prefix=s3_prefix,
s3_filename=s3_filename,
incremental=incremental
)
trigger_sync(
poll_interval_s=45,
status_updates=True,
connection_id=airbyte_connection_id,
timeout=300,
)
@flow(name="XtraTransformation")
def xtra_dbt(dbt_job_id=111111):
dbt_api_key = Secret.load("dbt-cloud")
run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key=dbt_api_key.get(),
account_id=1762
),
max_wait_seconds=3000,
job_id=dbt_job_id
)
@flow(name="XtraELTflow")
def xtra_elt_flow(deployments_name=["EKPO","EKKO"],dbt_job_id:int = 11111):
subflow_params = [x for x in deployments_name]
EL_flow_results = trigger_EL_flow.map(subflow_params)
if all(i.result() for i in EL_flow_results):
# trigger dbt, whether in cloud or CLI
xtra_dbt(dbt_job_id)
Khuyen Tran
12/02/2022, 4:38 PMfrom prefect import flow, task
from prefect.deployments import run_deployment
@task
def run_depl(depl1: str, depl2):
run_deployment(depl1)
run_deployment(depl2)
@flow
def subflow():
...
@flow
def main(depl1: str, depl2: str):
run_depl(depl1, depl2)
subflow()
if __name__ == "__main__":
main(depl1="test-flow/test-flow", depl2="test-flow/test-flow")
Can you run this code successfully?Boggdan Barrientos
12/02/2022, 4:40 PMKhuyen Tran
12/02/2022, 4:54 PMBoggdan Barrientos
12/02/2022, 6:05 PMKhuyen Tran
12/05/2022, 4:25 PM