Hi, anyone have an example using prefect to extrac...
# ask-community
k
Hi, anyone have an example using prefect to extract batches of data without knowing the size of the data? Kind of recursive fetching until no more?
k
Hi @Kien Nguyen, I kinda have something. See this. The logic is all packaged into a Flow that handles 1 piece of data. There is then a main flow that handles the logic of looping over the N pieces of data it receives and passing that to the
StartFlowRun
call
Actually let me fix it a bit one sec. Finished cleaning it up.
You can read more of task looping here .
k
hmm, I'm using LOOP already, but one of the problem is the size of the data. Maybe the
sub_flow
concept will solve this problem
Hey @Kevin Kho is it possible to have
sub_flow
's run in parallel? Let's say I have 2 sub_flow's: sub_flow1 and sub_flow2, each sub_flows consists of extract, transform and load task. sub_flow1 finishes extract task, and starts the transform task, at the same time sub_flow2 starts the extract task, once the sub_flow1 starts the load task, and sub_flow2 finishes extract task, sub_flow2 can start the transform task and so on...?
k
Yes you should be able to. Take out
wait=True
from the
StartFlowRun
call. You just need to be aware of how much aware these subflows are using because you may overload your compute. The default
LocalExecutor
is sequential but you can use the
LocalDaskExecutor
for parallel execution.
k
Cool, can I also set the first task of the second sub_flow to be dependent on the first task of the first sub_flow?
k
Yep you can use a syntax like this:
Copy code
@task()
def abc(a):
    return a
with Flow("ecs_testing") as flow:
    x = abc(1)
    y = abc(1, upstream_tasks=[x])
with Flow("ecs_tes") as flow2:
    x = abc(1)
    y = abc(1)
    y.set_upstream(x)
k
hmm, but this is setting dependencies at the compile time, right? In case of dynamic DAG and sub_flow concept, how would one do that?
k
I see what you are saying. You don’t know ahead of time how many subflows there are with task looping. Yes, I think you are right that you can’t set this dependency during run time. But I think you might be able to do this with the
wait_for_flow_run
task we have. It might be tricky though. I am thinking you can create this
wait_for_flow_run
task and point it to the first task of the first sub_flow. Then the first task of the second sub_flow could wait for it. But maybe this task has to be created during build time so it defeats the point. But I think something might be off. Are your subflows dependent on each other? That becomes really tricky when combining with task looping.
k
My requirement is kinda tricky: 1. Data is huge. Imagine 1 month or even a year of data fetched from API. It definitely need to be processed in batches. 2. It is time series data, so the batches need to be processed sequentially. 3. However, I want to enable some level of parallel processing, for example all sub_flows can be run in parallel, as long as the last step (Load step in ETL pipeline) is done in sequential manner (i.e.: Load of sub_flow1 started and finished first, then sub_flow2, then sub_flow3, ... and so on)
k
Yeah I think you can’t do the third. So Prefect + Dask gives you parallel execution in a depth-first manner, but it assumed independent processes and the requirement of independence is important because with Dask, the tasks are sent to other workers in the cluster. These workers don’t know what is going on in other workers, so it can’t natively monitor what is going on. I think the best you can do is use the parallel processing inside the task itself (sub_flow1_extract, sub_flow_1_transform). It’s these steps that should take advantage of parallelism if possible. I think there is one option here though, run extract and transform steps of all subflows in parallel and write out intermediate files then run all of
load
steps sequentially in a different Flow.
k
Yep, thought of writing to files also, put the timestamp to the file name, short by filenames and then process in sequence. Thanks a lot @Kevin Kho