https://prefect.io logo
Title
r

Rodrigo Neves

06/03/2020, 1:27 PM
Hello guys, first time here and recent user of prefect. First of all I need to say it that so far I’m loving it, keep with the awesome work!! I need help to compose a flow with several task with map. Currently I can do it, but each task is only processed after all task from last step are done. This is sub-optimal since each branch of the pipeline(each .map()) are independent of each other. There is any way of achieving that, out-of-the-box??
def run_flow_step_0_map(self, df):        
        cols = list(df.items())
        with Flow("pipeline_map") as flow:
            col = self.get_data.map(unmapped(self), cols)
            col = self.task1.map(unmapped(self), col)
            col = self.task2.map(unmapped(self), col)
            col = self.task3.map(unmapped(self), col)
            col = self.task4.map(unmapped(self), col)
            col = self.task5.map(unmapped(self), col)
            result = self.task5.map(unmapped(self), col)

        return flow
(is confusing the explanation, if you need extra info just say it)
👍 1
n

nicholas

06/03/2020, 1:30 PM
Hi @Rodrigo Neves and welcome! What you're looking for is Depth-first execution, something we're actively working on and that we think will be released very soon!
r

Rodrigo Neves

06/03/2020, 1:33 PM
Really nice, excited to start using that. Thank you for the feedback
😄 1
since we are here. I have other question, how can I start a prefect agent with a dask executor?
did not find anything about it. most likely I’m looking on the wrong place
j

Jeremiah

06/03/2020, 1:37 PM
@Rodrigo Neves I have a different question for you — the way you set up your mapped pipeline is unusual (not bad, just unusual!), in particular the
unmapped(self)
and I’m curious if there’s a use case you’re trying to achieve that we could make better. Are your tasks defined as decorated methods on a parent class?
r

Rodrigo Neves

06/03/2020, 1:40 PM
Indeed is unusual, I’m still wondering if makes any sense at all, but it currently works 🤣 . Yeah I’m adapting an old pipeline-based Class where I had to encapsulate class method with @task decorator. little example
@task
    def clean_resample_prefect(self, col):
        return self.clean_resample(col)
j

Jeremiah

06/03/2020, 1:42 PM
Neat, I don’t think I’ve seen this pattern before
n

nicholas

06/03/2020, 1:46 PM
@Rodrigo Neves and to your dask executor question, you'll want to define your executor with your flow, rather than with your agent. It'll look something like this:
from prefect.engine.executors import LocalDaskExecutor
from prefect import task, Flow

# some tasks and flow

executor = LocalDaskExecutor(address="<tcp://some.ip:8786>")
flow.run(executor=executor)
r

Rodrigo Neves

06/03/2020, 1:48 PM
Thanks. That I’m currently doing that and it works lika a charm. My question is if it’s not possible to start a prefect agent with a dask executor? If not, I won’t be able to orchestrate my flows using my local dask cluster.
n

nicholas

06/03/2020, 1:53 PM
Ah ok, think you'll want to take a look at the dask-kubernetes deployment recipe as well as the Kubernetes agent
r

Rodrigo Neves

06/03/2020, 2:00 PM
That’s overkill. I actually have the hypothesis to start a dask-kubernetes cluster locally where I could register my agent. But since I already have a local dask-scheduler up and running I would like to be able to use it. something like
flow.run_agent(executor=dask_executor)

or trough cli

prefect run dask agent --dask-adress localhost:32000
Idk if this makes any sense at all
j

Jeremiah

06/03/2020, 2:02 PM
@Rodrigo Neves typically your agent will run “next” to your Dask Cluster, and when it spins up a flow, the flow is the thing that knows about the cluster address and submits to it. So the dask cluster information gets attached to the flow environment, not the agent. The agent is just an agnostic tool for submitting work.
r

Rodrigo Neves

06/03/2020, 2:06 PM
Ok maybe I did something wrong. I need to retry it again. Will update you after. But, from my understanding so far, if I spin an agent trough the CLI, for example, the process doesn’t know how to use the dask cluster, because I have no way to connect that python process to the clsuter adress. So when I run the flow (trough the UI) it won’t make use of of dask executor to control the flow run
with Flow("process_csv") as flow:
    yyyymm = Parameter("yyyymm")
    sensors = get_sensors(yyyymm)
    results = process_csv_prefect.map(sensors, unmapped(yyyymm))
    process_reults(results)

from prefect.engine.executors import DaskExecutor

executor = DaskExecutor(address="localhost:3000")
dask_state = flow.run(parameters={"yyyymm": "201812"}, executor=executor)
This works as expected using dask cluster to parallelize my workload. Now my problem is do that with an agent. Even if i do the follwing in my script:
flow.register()
flow.run_agent()
and then trough the UI I run my workflow I can’t make use of dask cluster to parallelize the map step. It was more clear now?
Sorry for coming back to this, but I did not find any way (instead of spinning a dask-kubernetes cluster) to make use of dask-executor on my prefect orchestrated flows, that need a prefect agent to run. Am I missing something?
n

nicholas

06/04/2020, 3:49 PM
Hey sorry for the slow response @Rodrigo Neves! Had too many thread notifications and missed this one. Try adding this to your flow:
from prefect.environments import RemoteDaskEnvironment
environment = RemoteDaskEnvironment(address="address of your dask scheduler")

with Flow("process_csv", environment=environment) as flow:
  # do stuff

flow.register()
Once that's configured, your agent should automatically send work to the cluster.
r

Rodrigo Neves

06/04/2020, 4:44 PM
It’s fine no need for sorry here 😄. Thank you for the feedback. Tested and works like a charm. 😛refect: 😛refect: 😛refect: 😛refect:
n

nicholas

06/04/2020, 5:03 PM
😄