Hi ! I am very curious and eager to use the new op...
# prefect-community
d
Hi ! I am very curious and eager to use the new open-source ui. I tried the example on the blog post and now I am wondering how our code base can adapt to use it. My main confusion today is where does a DaskScheduler and their related workers fit on this? We have a kubernetes cluster with one dask scheduler a horizontal pod scaler for the workers, and several cronjobs to trigger jobs. Last time I went deep into prefect, there were no agents (or at least I don’t remember them), so I guess that agents and flow configuration can replace cronjobs on my side, but I don’t see where workers fit in now.
a
To catch you up on the agent and flow configuration portion, here’s a high level overview. You can add one or more schedules onto a flow. These schedules determine how often the open source server (going to call this prefect scheduler) will mark that flow as “ready to run”. Agents are small, polling processes that have 2 responsibilities. First, they poll against the prefect scheduler checking to see if anything needs to run. Second, if they find something that needs to run, they submit it to be run, but don’t do any of the work themselves. The DaskScheduler & workers likely will have moved very little from last time you looked at Prefect. Work still occurs on the dask workers, and those dask workers communicate with the dask scheduler. My guess is that you’re currently using a
FlowRunner
to actually run your flows (either that or calling
flow.run()
.
d
Hi @David Ojeda, I’m glad you’re excited to try out the UI! Adding to Alex’s answer: If you’re using Kubernetes and have a continuously running Dask cluster, I’d suggest installing a [Prefect Kubernetes Agent](https://docs.prefect.io/orchestration/agents/kubernetes.html) into your cluster and using the [Dask Executor](https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#the-dask-executor) with your flows. If you provide a scheduler address to the executor, the flow will submit work to your dask scheduler. When you register a flow with your prefect server instance and add a schedule, the Prefect Kubernetes agent will poll for work, then submit jobs to your Kubernetes cluster that in turn provide work to your Dask scheduler.
upvote 1
Let me know if I can help further!
d
Ok let me read on all that and I will respond back to this thread... Thanks for the quick reply !
d
One additional note for this setup: you’d have to expose the Apollo service in prefect server over a network that is accessible from your workers; one way would be to run them all in k8s (which would require recreating our docker compose as a k8s manifest) or routing localhost:4200 to other machines. You could also update the Prefect server config to point to an accessible IP that routes to where ever Apollo is running. This is so that the running task runs can send state updates to prefect server properly.
d
Ok I think I got it, although it was not easy to understand at first… From your helpful comments and links, I understood what the agent and its responsibilities are in this context: to schedule flows. As I mentioned in my first message, we currently have a bunch of CronJobs created on kubernetes for this particular use-case. It seems a bit complicated at first, but since it is managed by helm, it is relatively simple. From what I see on the UI, using the prefect UI is even simpler, which is great. Then, the key element that I was missing was how to use dask then? At first, I thought this was something the agent should declare somewhere like an environment variable. The correct answer is with the environment attached to the flow, so no magic needed on the agent. I ended up with a file
etl.py
as follows:
Copy code
import time

import prefect
from prefect import Flow, task
from prefect.environments import RemoteEnvironment


@task
def extract():
    logger = prefect.context['logger']
    <http://logger.info|logger.info>('extract')
    return [1,2,3]


@task
def transform(data):
    logger = prefect.context['logger']
    <http://logger.info|logger.info>('transform on %s', data)
    time.sleep(10)
    return data * 10


@task
def load(data):
    logger = prefect.context['logger']
    <http://logger.info|logger.info>('load version on %s', data)


def main():
    environment = RemoteEnvironment(
        executor='prefect.engine.executors.DaskExecutor',
        executor_kwargs={
            'address': '<tcp://localhost:8786>',
        }
    )

    with Flow('etl-example', environment=environment) as flow:
        e = extract()
        t = transform.map(e)
        l = load.map(t)

    flow.register()


if __name__ == '__main__':
    main()
So I need to register the flow by executing this script and then I can manually trigger it on the UI, and when I run an agent with
prefect agent start
, I saw how the dask scheduler and dask worker (that I had started before) received and processed the units of work. All of this was done locally, so now that I have understood the structure a bit better, I will try to replicate this on my cluster. I think I will start simple, which is by not using the KubernetesAgent (which will require a rbac installation), but just create a pod that will have the agent.
d
Best of luck! Keep in mind that your flows are going to try to send state information to
localhost
sources by default so you will have to dig into the configuration a bit to make sure they send state information to the Prefect Scheduler
Let us know how it goes!
d
Wait, so when you say flows are going to xxx, to what "actor" are you referring to? I have an agent, a dask scheduler, a dask worker and several other programs that I am currently considering as a black box behind a docker-compose. To me, flows are just an object that is handled by the agent and the UI black box...
d
The running flow run (the running flow itself) sends http requests to either Prefect Cloud’s API or the Prefect Scheduler to convey state updates (task a has failed, task a has succeeded, etc)
When you run flows in the cloud, with k8s for example, flows have a concept of storage:
When running with k8s, you should use DockerStorage
When the agent submits the flow for execution, it will pull the flow from storage (in this case, the docker registry) and run the flow (the docker container)
so that docker container (the “flow” itself, or in Prefect terms the “flow run”) becomes its own actor and sends state updates to the central Prefect scheduler. Since
prefect server start
is configured to run locally at the moment, you’ll need to update the configuration with the address of the Prefect Scheduler in your k8s cluster so that the flow run knows where to send the state updates.
Does that answer your question? Sorry if I went into too much detail
d
Ok I think I understand your comment concerning the state updates. Thanks a lot! I think I have enough info to continue forward and I will come back to the channel if I have any follow up questions. 👍
d
Great!