Is it possible to run a task B on a schedule, but ...
# ask-community
d
Is it possible to run a task B on a schedule, but also iterate through a list returned from an upstream task A? So for each rerun of given task B, it will iterate through the list from task A one iterable at a time
k
There are two things here. First, if they are independent iterations, then you can just do
map
and that will loop through them. If they need to be done in sequential, you can do task looping . For the scheduler part, you need to create another Flow for that and put that on a schedule. And then from the flow with task A, use
StartFlowRun
or
create_flow_run
to run task B. You can pass in Parameters for the sub flow run
d
So would it look something like this?
Copy code
task
def task_B(a_list):
    print(f"Running web scraper with iterable {a_list[x]}")
    #Would loop be here?

with prefect.Flow("task_B") as run_task_B:
    #Flow code here

@task
def task_A() -> list:
    #Perform SQL query here
    a_list = result.fetch_all()
    #Or would LOOP be here?
    return a_list

flow_run = StartFlowRun(flow_name = "task_B")

with prefect.Flow("task_A") as run_task_A:
    a_list = task_A()
    run = flow_run(run_name = f"iterator {a_list[x]}", parameters = {'a_list'}:a_list[x])
    run.set_upstream(a_list)
k
Looping in A is a lot easier. That will all be one big task through. You can call a task in another task by doing
task_B.run(input)
. This looks good but the only thing to add is that FlowB code would need a parameter to take in the
a_list
.
d
The idea is that I want to iterate through each item in the list the task_A returns, and each iterable will be one input for each run of B sequentially. So if the list returned is [1, 2, 3], then each run of task B will look like task_B(1), task_B(2), task_B(3)
k
So you definitely need task B to be registered as a second flow in order to put it on a schedule as well. The running of task B1, B2, B3 in sequence can either be put in taskA where your LOOP comment is. You can do
StartFlowRun(...).run(item)
to kick off the subflow, or you can create a new task
task_B_looper
to contain the LOOP logic and then kick of the new flow run there with the same
StartFlowRun(…).run(item)
d
Ah okay, I think I get it. So the general flow of things would look like:
Copy code
flow_A 
--- task_A -> list_A #Only run once per run of flow A
--- task_loop(list_A) # task_loop & flow_B will run as many times as len(list_A)
------ start = list_A[x]
------ LOOP(start = list_A[x+1])
------ StartFlowRun(flow_name = flow_B).run(parameters = start)
---------flow_B(schedule = schedule)
------------task_B(parameter = start)
k
Yes but I’m confused why there is a schedule there? The schedule will be attached to the registration of
flow_B
d
Oh my mistake, I didn't mean to put it there.
k
Ah ok I think we are on the same page
d
Ok I think this makes sense to me now. I will just have to test it out and see. One last question though, how would an interval scheduling work with LOOP? i.e. run task B1 wait an interval of 10 minutes then run task B2 and so on
k
Oh sorry I thought you wanted a different, independent schedule. In your case what you want is:
Copy code
# LOOP code
StartFlowRun(...).run(..., wait=True)
time.sleep(10*60)
the
time.sleep()
is just the native Python one
d
Ahh I see, so a proper interval with a clock is not necessary?
k
No it should not be. You can just use
time.sleep()
, or you can not wait for it
StartFlowRun().run(wait=False, scheduled_start_time=...)
so you can provide a start time for it also as you loop through it
d
Okay got it. Thank you so much. I will most likely have more questions later on but this is a great start.
Hi, here's how far I've gotten with this:
Copy code
@task
def task_A() -> list: 
    engine = create_engine(x, echo = True)
    conn = engine.connect()
    s = (
        '''
            QUERY HERE
        '''
    )
    query = sqlalchemy.text(s)
    with engine.connect() as conn:
        result = conn.execute(s)
        job_list = result.fetchall()
        job_list = [i[0] for i in job_list]
    return job_list[0:7]

@task
def task_B(param):
    #scraper_class = Scraper()
    #scraper_class.instantiate_web_scraper(param)
    print(param) #test to see if parameter is being passed to task_B properly

with prefect.Flow('flow_B') as run_flow_B:
    param = prefect.Parameter("item_B")
    task_B(param)

flow_run_B = StartFlowRun(flow_name = 'flow_B')

@task
def task_loop(title_list):
    loop_payload = prefect.context.get("task_loop_result", {})
    n = loop_payload.get("n", 0)
    start = title_list[n]
    print(f"Iterating task_loop using {start}")
    print(loop_payload)
    print(len(title_list))
    if n >= len(title_list):
        run = flow_run_B(run_name = f"Iterator_{start}").run(parameters = {'item_B': start})
        return n
    
    time.sleep(1*10) #wait every 10 seconds for testing

    raise LOOP(f'Iteration {n}', result = dict(n=n+1))

with prefect.Flow('flow_A') as run_flow_A:
    return_list = task_A() #upstream task
    #reactor.run()
    task_loop(return_list)


if _name_ == "_main_":
    run_flow_A.run()
There are two issues I'm running into when running this script. 1. I'm encountering this following error.
Copy code
ValueError: Could not infer an active Flow context while creating edge to <Task: Flow flow_B>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `StartFlowRun(...).run(...)`
2. What is the logic behind
if n >= len(title_list): return n
when it comes to looping? I don't quite understand what is going on when we return n.
k
The first error is easier. You can remove
Copy code
flow_run_B = StartFlowRun(flow_name = 'flow_B')
and then do
Copy code
run = StartFlowRun(run_name = f"Iterator_{start}").run(parameters = {'item_B': start})
For the second question, think of it this way. There are two paths your LOOP can take. One is to LOOP again, and the other is to stop looping. To loop again, you
raise LOOP
with the payload for the next loop iteration. To stop looping, just return anything. You can even return None. The
if n >= len(title_list):
will be a condition if you want to continue looping or end the loop
So I think you want StartFlowRun out of the loop
d
So outside of the task: task_loop?
k
Sorry. outside of the
if
, not the loop
d
Okay got it. So I moved it outside like so:
Copy code
@task
def task_loop(title_list):
    loop_payload = prefect.context.get("task_loop_result", {})
    n = loop_payload.get("n", 0)
    start = title_list[n]
    print(f"Iterating task_loop using {start}")
    print(loop_payload)
    if n == len(title_list):
        return None
    run = StartFlowRun(run_name = f"Iterator_{start}").run(flow_name = 'flow_B', project_name = 'test_run_1' ,parameters = {'item_B': start})
    time.sleep(1*10) #wait every 10 seconds for testing
    raise LOOP(f'Iteration {n}', result = dict(n=n+1))
Which I was prompted to add a project name which forced me to link to Prefect Cloud. However now it is telling me that flow_B is not found. Is it necessary to deploy flow_B to Cloud? Or is there a different way to run this utilizing only Prefect Core?
k
If you don’t want a new flow run, just to
task_B.run()
and this just runs the Python code underneath but is not treated as a task so there is no observability. If you had a loop of multiple tasks, that would break the DAG
d
Hey Kevin, just an update I got this to work! The main issue was not on Prefect, instead it was on the Scrapy end where each instance of the spider class did not detect a twisted.reactor instance. So importing crochet and having it spin up a reactor instance independent of scrapy works here.
I do have another question involving incrementing through a tasks list export and remembering list position between flow runs, but I will make another thread on that soon.
k
I do not understand what you said, but nice work finding it out!
d
Haha sorry, unfortunately there isn't a strong community support for scrapy so I've been left to sort of fend for myself