Aniruddha Sengupta
03/30/2022, 10:57 AMnews_url_extraction
which is run in the UI with the arguments in the run configuration: lang
, region
and root_dir
. I want to be able to run this flow in another flow called news_pipeline
and I have created the following code:
# Set the working directory
working_directory = general.home_dir() + settings.WORKING_DIR
# Functions
def execute_pipeline() -> None:
with Flow("news_pipeline_flow",
state_handlers=[prefect_utils.set_run_name]
) as flow:
# Perform the news url extraction flow
news_url_extraction_flow = create_flow_run(
flow_name="news_url_extraction_flow",
project_name="scraper_pipeline",
parameters={
"root_dir": os.environ.get("root_dir"),
"lang": os.environ.get("lang"),
"region": os.environ.get("region")
}
)
# wait_for_flow_news_url_extraction_flow = wait_for_flow_run(
# news_url_extraction_flow,
# raise_final_state=True
# )
# # Perform the news spider flow
# news_spider_flow = create_flow_run(
# flow_name="news_spider_flow",
# project_name="scraper_pipeline"
# )
# wait_for_flow_news_spider_flow = wait_for_flow_run(
# news_spider_flow,
# raise_final_state=True
# )
news_url_extraction_flow
return flow
if __name__ == "__main__":
"""
The main process to get the Simple News pipeline registered
in the Prefect UI.
"""
print("Creating flow")
flow = execute_pipeline()
# Register news pipeline parent flow
print("Creating labels")
labels = [general.retrieve_hostname()]
print("Setting the configuration")
env = {
"root_dir": "the path where the results of the scraped urls should go",
"lang": "the language param",
"region": "the region param",
}
flow.run_config = LocalRun(working_dir=working_directory, labels=labels, env=env)
project_name = "scraper_pipeline"
print(f"Checking project {project_name} exists")
project_existance = prefect_utils.check_project_exists(project_name)
if not project_existance:
print(f"Project {project_name} doesn't exist, creating now")
client = Client()
client.create_project(project_name=project_name)
print("Registering the flow")
flow.register(project_name=project_name)
print(f"Update the agent labels for agent: {settings.PREFECT_AGENT_NAME}")
prefect_utils.update_agent_labels(agent_name=settings.PREFECT_AGENT_NAME, new_labels=labels)
When I run the child flow news_url_extraction
the process completes within a few seconds. But when I run the parent flow news_pipeline
the flow remains in a continuous running state. Is there anything I am missing here?
I should also say that the reason why I am doing this in the first place is I have two flows I want to run: news_url_extraction
and news_spiderl
one after the other. At the moment, I have to manually register both flows then run them separately, but I want them to run them sequentially in one flow instead. The interesting thing to note here is that news_spider
is not able to run on multithreads where as news_url_extraction
is. So the two flows have slight different configurations, namely news_url_extraction
uses LocalDaskExecutor
but news_spider
does not.
Thanks and hope this all makes sense.Anna Geller
with Flow("flow_name", executor=LocalDaskExecutor())
rather than:
flow.executor = LocalDaskExecutor()
But when I run the parent flowSomething weird is going on within your parent flow. Why do you have this line there? Not sure if this is the culprit but it should be removed for sure:the flow remains in a continuous running statenews_pipeline
news_url_extraction_flow
If the current setup you shared is not working, I would encourage you to start with something simpler, e.g. try running this simple child and parent flow: https://github.com/anna-geller/orchestrator-patternAniruddha Sengupta
03/30/2022, 4:08 PMAnna Geller
Aniruddha Sengupta
03/31/2022, 2:35 PMnews_url_extraction_flow
which takes one input; a root_dir
param:
Now, I want to be able to run this child flow within a parent flow called news_pipeline_flow
. What would be the correct way to ensure that the param root_dir
gets passed down from parent to the child flow so it can run?
Hope this makes more senseAnna Geller
from prefect import Flow, Parameter
from prefect.tasks.prefect import create_flow_run
with Flow("news_pipeline_flow") as parent_flow:
param = Parameter("root_dir", default="/data/artifacts/other_scrapers")
flow_run_id = create_flow_run(
flow_name="news_url_extraction_flow",
project_name="scraper_pipeline",
parameters=dict(root_dir=param),
)
is this clearer now with this above example code?