Dominic Pham
11/08/2021, 10:07 PMKevin Kho
map
and that will loop through them. If they need to be done in sequential, you can do task looping .
For the scheduler part, you need to create another Flow for that and put that on a schedule. And then from the flow with task A, use StartFlowRun
or create_flow_run
to run task B. You can pass in Parameters for the sub flow runDominic Pham
11/08/2021, 11:22 PMtask
def task_B(a_list):
print(f"Running web scraper with iterable {a_list[x]}")
#Would loop be here?
with prefect.Flow("task_B") as run_task_B:
#Flow code here
@task
def task_A() -> list:
#Perform SQL query here
a_list = result.fetch_all()
#Or would LOOP be here?
return a_list
flow_run = StartFlowRun(flow_name = "task_B")
with prefect.Flow("task_A") as run_task_A:
a_list = task_A()
run = flow_run(run_name = f"iterator {a_list[x]}", parameters = {'a_list'}:a_list[x])
run.set_upstream(a_list)
Kevin Kho
task_B.run(input)
. This looks good but the only thing to add is that FlowB code would need a parameter to take in the a_list
.Dominic Pham
11/08/2021, 11:26 PMKevin Kho
StartFlowRun(...).run(item)
to kick off the subflow, or you can create a new task task_B_looper
to contain the LOOP logic and then kick of the new flow run there with the same StartFlowRun(…).run(item)
Dominic Pham
11/09/2021, 12:10 AMflow_A
--- task_A -> list_A #Only run once per run of flow A
--- task_loop(list_A) # task_loop & flow_B will run as many times as len(list_A)
------ start = list_A[x]
------ LOOP(start = list_A[x+1])
------ StartFlowRun(flow_name = flow_B).run(parameters = start)
---------flow_B(schedule = schedule)
------------task_B(parameter = start)
Kevin Kho
flow_B
Dominic Pham
11/09/2021, 12:21 AMKevin Kho
Dominic Pham
11/09/2021, 12:31 AMKevin Kho
# LOOP code
StartFlowRun(...).run(..., wait=True)
time.sleep(10*60)
Kevin Kho
time.sleep()
is just the native Python oneDominic Pham
11/09/2021, 12:34 AMKevin Kho
time.sleep()
, or you can not wait for it
StartFlowRun().run(wait=False, scheduled_start_time=...)
so you can provide a start time for it also as you loop through itDominic Pham
11/09/2021, 12:44 AMDominic Pham
11/09/2021, 10:13 PM@task
def task_A() -> list:
engine = create_engine(x, echo = True)
conn = engine.connect()
s = (
'''
QUERY HERE
'''
)
query = sqlalchemy.text(s)
with engine.connect() as conn:
result = conn.execute(s)
job_list = result.fetchall()
job_list = [i[0] for i in job_list]
return job_list[0:7]
@task
def task_B(param):
#scraper_class = Scraper()
#scraper_class.instantiate_web_scraper(param)
print(param) #test to see if parameter is being passed to task_B properly
with prefect.Flow('flow_B') as run_flow_B:
param = prefect.Parameter("item_B")
task_B(param)
flow_run_B = StartFlowRun(flow_name = 'flow_B')
@task
def task_loop(title_list):
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 0)
start = title_list[n]
print(f"Iterating task_loop using {start}")
print(loop_payload)
print(len(title_list))
if n >= len(title_list):
run = flow_run_B(run_name = f"Iterator_{start}").run(parameters = {'item_B': start})
return n
time.sleep(1*10) #wait every 10 seconds for testing
raise LOOP(f'Iteration {n}', result = dict(n=n+1))
with prefect.Flow('flow_A') as run_flow_A:
return_list = task_A() #upstream task
#reactor.run()
task_loop(return_list)
if _name_ == "_main_":
run_flow_A.run()
There are two issues I'm running into when running this script.
1. I'm encountering this following error.
ValueError: Could not infer an active Flow context while creating edge to <Task: Flow flow_B>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `StartFlowRun(...).run(...)`
2. What is the logic behind if n >= len(title_list): return n
when it comes to looping? I don't quite understand what is going on when we return n.Kevin Kho
flow_run_B = StartFlowRun(flow_name = 'flow_B')
and then do
run = StartFlowRun(run_name = f"Iterator_{start}").run(parameters = {'item_B': start})
Kevin Kho
raise LOOP
with the payload for the next loop iteration. To stop looping, just return anything. You can even return None. The if n >= len(title_list):
will be a condition if you want to continue looping or end the loopKevin Kho
Dominic Pham
11/09/2021, 10:39 PMKevin Kho
if
, not the loopDominic Pham
11/09/2021, 11:06 PM@task
def task_loop(title_list):
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 0)
start = title_list[n]
print(f"Iterating task_loop using {start}")
print(loop_payload)
if n == len(title_list):
return None
run = StartFlowRun(run_name = f"Iterator_{start}").run(flow_name = 'flow_B', project_name = 'test_run_1' ,parameters = {'item_B': start})
time.sleep(1*10) #wait every 10 seconds for testing
raise LOOP(f'Iteration {n}', result = dict(n=n+1))
Which I was prompted to add a project name which forced me to link to Prefect Cloud. However now it is telling me that flow_B is not found. Is it necessary to deploy flow_B to Cloud? Or is there a different way to run this utilizing only Prefect Core?Kevin Kho
task_B.run()
and this just runs the Python code underneath but is not treated as a task so there is no observability. If you had a loop of multiple tasks, that would break the DAGDominic Pham
11/18/2021, 11:28 PMDominic Pham
11/18/2021, 11:30 PMKevin Kho
Dominic Pham
11/18/2021, 11:31 PM