Rodrigo Neves
06/03/2020, 1:27 PMdef run_flow_step_0_map(self, df):
cols = list(df.items())
with Flow("pipeline_map") as flow:
col = self.get_data.map(unmapped(self), cols)
col = self.task1.map(unmapped(self), col)
col = self.task2.map(unmapped(self), col)
col = self.task3.map(unmapped(self), col)
col = self.task4.map(unmapped(self), col)
col = self.task5.map(unmapped(self), col)
result = self.task5.map(unmapped(self), col)
return flow
(is confusing the explanation, if you need extra info just say it)nicholas
06/03/2020, 1:30 PMRodrigo Neves
06/03/2020, 1:33 PMJeremiah
06/03/2020, 1:37 PMunmapped(self)
and I’m curious if there’s a use case you’re trying to achieve that we could make better. Are your tasks defined as decorated methods on a parent class?Rodrigo Neves
06/03/2020, 1:40 PM@task
def clean_resample_prefect(self, col):
return self.clean_resample(col)
Jeremiah
06/03/2020, 1:42 PMnicholas
06/03/2020, 1:46 PMfrom prefect.engine.executors import LocalDaskExecutor
from prefect import task, Flow
# some tasks and flow
executor = LocalDaskExecutor(address="<tcp://some.ip:8786>")
flow.run(executor=executor)
Rodrigo Neves
06/03/2020, 1:48 PMnicholas
06/03/2020, 1:53 PMRodrigo Neves
06/03/2020, 2:00 PMflow.run_agent(executor=dask_executor)
or trough cli
prefect run dask agent --dask-adress localhost:32000
Idk if this makes any sense at allJeremiah
06/03/2020, 2:02 PMRodrigo Neves
06/03/2020, 2:06 PMwith Flow("process_csv") as flow:
yyyymm = Parameter("yyyymm")
sensors = get_sensors(yyyymm)
results = process_csv_prefect.map(sensors, unmapped(yyyymm))
process_reults(results)
from prefect.engine.executors import DaskExecutor
executor = DaskExecutor(address="localhost:3000")
dask_state = flow.run(parameters={"yyyymm": "201812"}, executor=executor)
This works as expected using dask cluster to parallelize my workload.
Now my problem is do that with an agent. Even if i do the follwing in my script:
flow.register()
flow.run_agent()
and then trough the UI I run my workflow I can’t make use of dask cluster to parallelize the map step. It was more clear now?nicholas
06/04/2020, 3:49 PMfrom prefect.environments import RemoteDaskEnvironment
environment = RemoteDaskEnvironment(address="address of your dask scheduler")
with Flow("process_csv", environment=environment) as flow:
# do stuff
flow.register()
Once that's configured, your agent should automatically send work to the cluster.Rodrigo Neves
06/04/2020, 4:44 PMnicholas
06/04/2020, 5:03 PM