https://prefect.io logo
#prefect-community
Title
# prefect-community
b

bral

08/17/2020, 10:36 PM
Good evening, I ran into the following problem, which I find it difficult to solve. My flow looks something like this: And here's an example: I am using localagent, and this flow will be executed sequentially for each file. What is the correct way to parallelize it? Use multiprocessing? I am more inclined to use the dask executor, I looked at several examples, but nowhere the flow was registered, but simply executed. What is the best solution to parallelize the calculations (the best option, I assume, that each node of the executor takes a separate file for itself)?
from prefect import task, Flow, case @task def is_running(): return False def get_files(): return ['1','2','3','4'] @task def read_file(file): print(file) return True @task def preprocess_file(file): print(file) return True @task def save_file(file): print(file) return True with Flow("process") as flow: condition = is_running() with case(condition, False) as cond: files = get_files() for file in files[03] r = read_file(file) p = preprocess_file(file) s = save_file(file) r.set_downstream(p) p.set_downstream(s) flow.register("test")
k

Kyle Moon-Wright

08/17/2020, 10:48 PM
Hey @bral, I think the best way to parallelize your flow for your use case would be your inclination to use the
DaskExecutor
. To attach it to your flow you'd like to register, you can attach the
DaskExecutor
to a
LocalEnvironment
for your flow like this:
Copy code
from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.engine.executors import DaskExecutor

environment = LocalEnvironment(executor=DaskExecutor())

flow = Flow(
    "Dask Executor Example",
   environment=environment
)

flow.register(project_name="project_name")
As far as parallelizing your tasks in a flow context, I'd recommend checking out the dependency structure in this idiom on parallelism within a Prefect Flow.
b

bral

08/17/2020, 11:01 PM
@Kyle Moon-Wright i got it, thank you !
👍 2