Hi all, I have a flow registered called `news_url_...
# prefect-server
a
Hi all, I have a flow registered called
news_url_extraction
which is run in the UI with the arguments in the run configuration:
lang
,
region
and
root_dir
. I want to be able to run this flow in another flow called
news_pipeline
and I have created the following code:
Copy code
# Set the working directory
working_directory = general.home_dir() + settings.WORKING_DIR

# Functions
def execute_pipeline() -> None:
    with Flow("news_pipeline_flow",
          state_handlers=[prefect_utils.set_run_name]
        ) as flow:

        # Perform the news url extraction flow
        news_url_extraction_flow = create_flow_run(
            flow_name="news_url_extraction_flow", 
            project_name="scraper_pipeline",
            parameters={
                "root_dir": os.environ.get("root_dir"),
                "lang": os.environ.get("lang"),
                "region": os.environ.get("region")
                }
            )
        # wait_for_flow_news_url_extraction_flow = wait_for_flow_run(
        #     news_url_extraction_flow, 
        #     raise_final_state=True
        #     )
        
        # # Perform the news spider flow
        # news_spider_flow = create_flow_run(
        #     flow_name="news_spider_flow", 
        #     project_name="scraper_pipeline"
        #     )
        # wait_for_flow_news_spider_flow = wait_for_flow_run(
        #     news_spider_flow, 
        #     raise_final_state=True
        #     )
        
        news_url_extraction_flow
    
    return flow


if __name__ == "__main__":
    """
    The main process to get the Simple News pipeline registered
    in the Prefect UI.

    """
    print("Creating flow")
    flow = execute_pipeline()

    # Register news pipeline parent flow
    print("Creating labels")
    labels = [general.retrieve_hostname()]

    print("Setting the configuration")
    env = {
        "root_dir": "the path where the results of the scraped urls should go",
        "lang": "the language param",
        "region": "the region param",
    }
    flow.run_config = LocalRun(working_dir=working_directory, labels=labels, env=env)

    project_name = "scraper_pipeline"

    print(f"Checking project {project_name} exists")
    project_existance = prefect_utils.check_project_exists(project_name)

    if not project_existance:
        print(f"Project {project_name} doesn't exist, creating now")
        client = Client()
        client.create_project(project_name=project_name)

    print("Registering the flow")
    flow.register(project_name=project_name)

    print(f"Update the agent labels for agent: {settings.PREFECT_AGENT_NAME}")
    prefect_utils.update_agent_labels(agent_name=settings.PREFECT_AGENT_NAME, new_labels=labels)
When I run the child flow
news_url_extraction
the process completes within a few seconds. But when I run the parent flow
news_pipeline
the flow remains in a continuous running state. Is there anything I am missing here? I should also say that the reason why I am doing this in the first place is I have two flows I want to run:
news_url_extraction
and
news_spiderl
one after the other. At the moment, I have to manually register both flows then run them separately, but I want them to run them sequentially in one flow instead. The interesting thing to note here is that
news_spider
is not able to run on multithreads where as
news_url_extraction
is. So the two flows have slight different configurations, namely
news_url_extraction
uses
LocalDaskExecutor
but
news_spider
does not. Thanks and hope this all makes sense.
a
Regarding the multithreading question: this is likely related to how and where do you attach executor to your flow. Executor information is not stored in the Prefect backend, but it's retrieved from storage. That's why ideally you should attach it directly when creating your flow:
Copy code
with Flow("flow_name", executor=LocalDaskExecutor())
rather than:
Copy code
flow.executor = LocalDaskExecutor()
Regarding:
But when I run the parent flow
news_pipeline
the flow remains in a continuous running state
Something weird is going on within your parent flow. Why do you have this line there? Not sure if this is the culprit but it should be removed for sure:
Copy code
news_url_extraction_flow
If the current setup you shared is not working, I would encourage you to start with something simpler, e.g. try running this simple child and parent flow: https://github.com/anna-geller/orchestrator-pattern
a
Thanks Anna, my child flow takes some parameters which are user specified in the environment variables. How do I specify the parent flow to take the same variables from the environment so that the child flow is able to run? Thanks
a
Not sure what exactly you would like to change, but here is an example showing how you can override default parameter values for a child flow run https://discourse.prefect.io/t/how-to-dynamically-change-the-run-name-in-the-create-flow-run-task/97
a
Suppose I have a child flow called
news_url_extraction_flow
which takes one input; a
root_dir
param: Now, I want to be able to run this child flow within a parent flow called
news_pipeline_flow
. What would be the correct way to ensure that the param
root_dir
gets passed down from parent to the child flow so it can run? Hope this makes more sense
a
You could override it from a Parent. You could even create the same Parameter task in parent flow and pass it to the downstream flow if that's your preference:
Copy code
from prefect import Flow, Parameter
from prefect.tasks.prefect import create_flow_run

with Flow("news_pipeline_flow") as parent_flow:
    param = Parameter("root_dir", default="/data/artifacts/other_scrapers")
    flow_run_id = create_flow_run(
        flow_name="news_url_extraction_flow",
        project_name="scraper_pipeline",
        parameters=dict(root_dir=param),
    )
is this clearer now with this above example code?