Hello everyone, I'm Syméon from Switzerland. I'm c...
# introductions
s
Hello everyone, I'm Syméon from Switzerland. I'm currently discovering Prefect to replace our complex Celery pipelines that became impossible to observe and hard to maintain. The product looks great and full of deep features. Happy to join this community and learn how to use it properly ! Thanks 🙂
prefect rocket 6
💙 6
👋 7
catjam 4
n
hi @Syméon del Marmol - welcome catjam let us know if you have any questions! i'll drop a couple links you might be interested in since you mentioned Celery • https://www.prefect.io/blog/background-tasks-why-they-matter-in-prefecthttps://docs.prefect.io/v3/advanced/background-tasks but also you might want to check out events!
👀 1
b
Welcome @Syméon del Marmol! prefect spin
s
hi @Nate thanks a lot for the links, this is giving me ideas but new questions too ! I'm trying to find the best patterns to migrate our celery workflows, which were executing tasks in parallel on different worker pools. I see that background tasks fit well the need, but this means that we loose the benefits of flows, structuring tasks togethers .. right ? I tried deployments, but I didn't find simple way to "just" call a flow from our main backend to execute it on a worker.. and didn't find a way to "await" the end of the flow (only found polling ?). Any thoughts about those points ?
n
> this means that we loose the benefits of flows does it help to know that its perfectly valid to call flows from tasks?
Copy code
» ipython

#[1]
from prefect import flow, task

#[2]
@flow
def compose_some_work(): ...

@task
def serve_this_someplace():
    compose_some_work()


#[3]
serve_this_someplace.delay()
14:16:58.593 | INFO    | prefect.tasks - Created task run 'serve_this_someplace'. View it in the UI at <URL>
Out[3]: <prefect.futures.PrefectDistributedFuture at 0x115713230>

#[4]
serve_this_someplace.serve()
14:17:04.113 | INFO    | prefect.task_worker - Starting task worker...
14:17:04.113 | INFO    | prefect.task_worker - Subscribing to runs of task(s): serve_this_someplace
14:17:04.480 | INFO    | prefect.task_worker - Received task run: 068acb6a-a807-716a-8000-98335c7b2833 - serve_this_someplace
14:17:04.482 | INFO    | prefect.task_worker - Submitting task run 'serve_this_someplace' to engine. View in the UI: <URL>
14:17:05.857 | INFO    | Flow run 'armored-wrasse' - Beginning flow run 'armored-wrasse' for flow 'compose-some-work'
14:17:05.862 | INFO    | Flow run 'armored-wrasse' - View at <URL>
14:17:06.040 | INFO    | Flow run 'armored-wrasse' - Finished in state Completed()
14:17:06.048 | INFO    | Task run 'serve_this_someplace' - Finished in state Completed()
^C14:17:11.921 | INFO    | prefect.task_worker - Task worker interrupted, stopping...
im delaying the task run and then serving it here just because im in
ipython
but typically you'd
.delay
from your application or workflow foreground and keep
.serve()
going on some static infra (2 separate processes, at least). background tasks scale nice horizontally, like redis stream consumer groups so in summary: • serve tasks to start a worker (websocket, does not poll) that will get pushed runs from tasks its serving (ie subscribed to) • those tasks can call tasks or flows or any python, flows you call will still show up as any other would in the UI > I tried deployments, but I didn't find simple way to "just" call a flow from our main backend yea deployments can be overkill if you don't need per-flow-run configuration of your infrastructure (e.g. kubernetes pod with X mem and Y cpu) and you're just trying to make something happen in the background
s
Oh great ! I'll give a try. Thanks this wasn't clear to me, I always saw Flow -> sub-flows -> tasks and thought it was the only correct way. To select a worker, should I use task runners like Dask or Ray ? or can I simply use traditional python containers (as docker container or kube pods) ?
n
i think of the `serve` process you start as an entrypoint. here are some links to code backing the docs page i linked above you can freely make more
replicas
of the entrypoint that runs the serve process, ie add more workers. you should not run into race conditions related to work distribution (which sometimes happens with the traditional workers that poll for scheduled deployment runs) https://github.com/PrefectHQ/examples/blob/main/apps/background-tasks/Dockerfile#L40 https://github.com/PrefectHQ/examples/blob/main/apps/background-tasks/compose.yaml
task runners (naming is hard) are how you make task runs happen at the same time in a flow, using threads, dask or ray (probably not what you're looking for here) in this case,
replicas: n
means i want N separate websocket clients, each of which can execute task runs its pushed concurrently. so really just add replicas if memory becomes a problem in your workloads
s
Ok thanks It means that Prefect server acts as a sort of message broker / rabbitmq to distribute the load ? How does it work if I serve some tasks in one worker and some other in another. Will the call to "my_task.delay()" go to the appropriate one ?