Hello guys, first time here and recent user of pre...
# prefect-community
r
Hello guys, first time here and recent user of prefect. First of all I need to say it that so far I’m loving it, keep with the awesome work!! I need help to compose a flow with several task with map. Currently I can do it, but each task is only processed after all task from last step are done. This is sub-optimal since each branch of the pipeline(each .map()) are independent of each other. There is any way of achieving that, out-of-the-box??
Copy code
def run_flow_step_0_map(self, df):        
        cols = list(df.items())
        with Flow("pipeline_map") as flow:
            col = self.get_data.map(unmapped(self), cols)
            col = self.task1.map(unmapped(self), col)
            col = self.task2.map(unmapped(self), col)
            col = self.task3.map(unmapped(self), col)
            col = self.task4.map(unmapped(self), col)
            col = self.task5.map(unmapped(self), col)
            result = self.task5.map(unmapped(self), col)

        return flow
(is confusing the explanation, if you need extra info just say it)
👍 1
n
Hi @Rodrigo Neves and welcome! What you're looking for is Depth-first execution, something we're actively working on and that we think will be released very soon!
r
Really nice, excited to start using that. Thank you for the feedback
😄 1
since we are here. I have other question, how can I start a prefect agent with a dask executor?
did not find anything about it. most likely I’m looking on the wrong place
j
@Rodrigo Neves I have a different question for you — the way you set up your mapped pipeline is unusual (not bad, just unusual!), in particular the
unmapped(self)
and I’m curious if there’s a use case you’re trying to achieve that we could make better. Are your tasks defined as decorated methods on a parent class?
r
Indeed is unusual, I’m still wondering if makes any sense at all, but it currently works 🤣 . Yeah I’m adapting an old pipeline-based Class where I had to encapsulate class method with @task decorator. little example
Copy code
@task
    def clean_resample_prefect(self, col):
        return self.clean_resample(col)
j
Neat, I don’t think I’ve seen this pattern before
n
@Rodrigo Neves and to your dask executor question, you'll want to define your executor with your flow, rather than with your agent. It'll look something like this:
Copy code
from prefect.engine.executors import LocalDaskExecutor
from prefect import task, Flow

# some tasks and flow

executor = LocalDaskExecutor(address="<tcp://some.ip:8786>")
flow.run(executor=executor)
r
Thanks. That I’m currently doing that and it works lika a charm. My question is if it’s not possible to start a prefect agent with a dask executor? If not, I won’t be able to orchestrate my flows using my local dask cluster.
n
Ah ok, think you'll want to take a look at the dask-kubernetes deployment recipe as well as the Kubernetes agent
r
That’s overkill. I actually have the hypothesis to start a dask-kubernetes cluster locally where I could register my agent. But since I already have a local dask-scheduler up and running I would like to be able to use it. something like
Copy code
flow.run_agent(executor=dask_executor)

or trough cli

prefect run dask agent --dask-adress localhost:32000
Idk if this makes any sense at all
j
@Rodrigo Neves typically your agent will run “next” to your Dask Cluster, and when it spins up a flow, the flow is the thing that knows about the cluster address and submits to it. So the dask cluster information gets attached to the flow environment, not the agent. The agent is just an agnostic tool for submitting work.
r
Ok maybe I did something wrong. I need to retry it again. Will update you after. But, from my understanding so far, if I spin an agent trough the CLI, for example, the process doesn’t know how to use the dask cluster, because I have no way to connect that python process to the clsuter adress. So when I run the flow (trough the UI) it won’t make use of of dask executor to control the flow run
Copy code
with Flow("process_csv") as flow:
    yyyymm = Parameter("yyyymm")
    sensors = get_sensors(yyyymm)
    results = process_csv_prefect.map(sensors, unmapped(yyyymm))
    process_reults(results)

from prefect.engine.executors import DaskExecutor

executor = DaskExecutor(address="localhost:3000")
dask_state = flow.run(parameters={"yyyymm": "201812"}, executor=executor)
This works as expected using dask cluster to parallelize my workload. Now my problem is do that with an agent. Even if i do the follwing in my script:
Copy code
flow.register()
flow.run_agent()
and then trough the UI I run my workflow I can’t make use of dask cluster to parallelize the map step. It was more clear now?
Sorry for coming back to this, but I did not find any way (instead of spinning a dask-kubernetes cluster) to make use of dask-executor on my prefect orchestrated flows, that need a prefect agent to run. Am I missing something?
n
Hey sorry for the slow response @Rodrigo Neves! Had too many thread notifications and missed this one. Try adding this to your flow:
Copy code
from prefect.environments import RemoteDaskEnvironment
environment = RemoteDaskEnvironment(address="address of your dask scheduler")

with Flow("process_csv", environment=environment) as flow:
  # do stuff

flow.register()
Once that's configured, your agent should automatically send work to the cluster.
r
It’s fine no need for sorry here 😄. Thank you for the feedback. Tested and works like a charm. P P P P
n
😄