Hi Experts, I created a flow of flows with startFl...
# ask-community
l
Hi Experts, I created a flow of flows with startFlowRun. Dependencies have been set in the control flow. I have set wait=True in each of the startFlowRun(wait=True, *kwargs). Based on my understand the flow should keep going down to the dependent flows if the upstream flows has been executed successfully. I set the the test in a way that flow3 only depend on flow2 and flow2 only depend on flow1. I was think that flow2 will start execution when flow1 succeed; but it turns out that it did not get going until flow4 finish running. This is not what I was expecting. Can you see what I might have done wrong?
z
Hi! It'd be helpful if you shared your flow of flows.
👍 1
l
Copy code
import prefect
from prefect import Flow, task, Parameter, context
from prefect.storage import Docker, S3
from prefect.engine import signals
from prefect.run_configs import LocalRun
from prefect.tasks.prefect import StartFlowRun
from prefect import Client
from botocore.client import ClientError
import os
import boto3
import hashlib
import datetime
from datetime import timedelta
import time
import json


@task
def get_parameters(date_to_process, flow_run_id) -> dict:
    logger = prefect.context.get("logger")

    if date_to_process == None:
        date_to_process = prefect.context.get("yesterday")
    else:
        # Validate the parameter is a date
        date_format = "%Y-%m-%d"
        try:
            date_obj = datetime.datetime.strptime(date_to_process, date_format)
        except ValueError:
            logger.error("Incorrect data format, should be YYYY-MM-DD")
            raise signals.FAIL()

    <http://logger.info|logger.info>(f"Processing data from {date_to_process}")
    return {'date_to_process': date_to_process, 'flow_run_id': prefect.context.get('flow_run_id'), 'other_keys': 'test'}

#date_to_process = Parameter("date_to_process", default='2021-01-01')
# assumes you have registered the following flows 
dim_ubi_v_trigger = StartFlowRun(flow_name="dim_ubi_v_trigger", project_name="TEL-DE-Dev", wait=True)
dim_ubi_v = StartFlowRun(flow_name="dim_ubi_v", project_name="TEL-DE-Dev", wait=True)  

dim_ubi_tel_src_trigger = StartFlowRun(flow_name="dim_ubi_tel_src_trigger", project_name="TEL-DE-Dev", wait=True)  
dim_ubi_tel_src = StartFlowRun(flow_name="dim_ubi_tel_src", project_name="TEL-DE-Dev", wait=True)  

dim_ubi_pt_source = StartFlowRun(flow_name="dim_ubi_tel_src", project_name="TEL-DE-Dev", wait=True) 
fact_ubi_initial_connection_event=StartFlowRun(flow_name="fact_ubi_initial_connection_event", project_name="TEL-DE-Dev", wait=True)

with Flow("parent-flow") as flow:
    date_to_process = Parameter(name="date_to_process", default=None)
    flow_run_id = Parameter(name="flow_run_id", default = 'xxxxxx')
    vp = get_parameters(date_to_process, flow_run_id)   

    
    dim_ubi_v_trigger = dim_ubi_v_trigger(upstream_tasks=[vp], parameters=vp)
    dim_ubi_v = dim_ubi_v(upstream_tasks=[dim_ubi_v_trigger], parameters=vp)
    
    dim_ubi_tel_src_trigger = dim_ubi_tel_src_trigger(upstream_tasks=[vp], parameters=vp)
    dim_ubi_tel_src = dim_ubi_tel_src(upstream_tasks=[dim_ubi_tel_src_trigger],parameters=vp)

    dim_ubi_pt_source = dim_ubi_pt_source(upstream_tasks=[dim_ubi_v,dim_ubi_tel_src])
    fact_ubi_initial_connection_event=fact_ubi_initial_connection_event(upstream_tasks=[dim_ubi_v], parameters=vp)

if __name__ == "__main__":
    flow.visualize()
z
It looks like you're not setting an executor on your flow so things can't run in parallel. Since only one can run on a time and 4 does not depend on 2/3, it could be executed first.
l
that make sense. I did not think of that. Thanks for pointing that out