Samuel Schlesinger
02/28/2025, 11:28 PMmy_decorated_func.apply_async([args])
, i.e. `say_hello_world.apply_async(["Marvin"])`; and the workers would pick up the jobs, set up app internals (environment config et al), and then run the decorated function automatically.
I'm not seeing an obvious way to do this with Prefect. I can call my say_hello_world
flow directly, and it'll run locally, but I need it to run in the worker pool. Calling .deploy()
tries to register it with the default worker pool, which is great, but it complains about needing an entrypoint or image. I saw some comments online about using 'local storage' to point to the specific file the flow is in, i.e. /path/to/file/flow.py:say_hello_world
, but... there's no way that's the "right" way to queue a job, right?
I get that the Prefect control plane allows for total independence between the place that's queueing jobs and the place that's executing them, but in my case, they're both the same docker image; just with different entrypoints (starting the API vs starting the prefect workers). What's a clean way to just say "look for this exact same decorated function in the worker", essentially as if it were running locally but in a different container?
CC @MarvinMarvin
02/28/2025, 11:28 PMprefect.yaml
file. More on this can be found here.
4. Local Storage: While local storage might seem like a hack, it can be a legitimate option if your workers and deployment environment are tightly coupled and local execution is viable.
In summary, to mimic Celery's behavior, ensure your flows are properly deployed with the necessary entrypoints and images, and configure your workers to poll the appropriate work pools. This way, your flows can be executed in the worker pool environment, similar to how Celery workers process tasks from a queue.Samuel Schlesinger
02/28/2025, 11:34 PMMarvin
02/28/2025, 11:34 PMprefect.yaml
file without managing the infrastructure. Since your workers are already running and joined to a work pool, you can focus on targeting these existing resources.
2. Use of Work Pools: Ensure that your workers are polling the correct work pool. Each worker type corresponds to a specific execution environment, and they will execute flows assigned to their respective work pool. This setup ensures that the flow will be executed in the intended environment.
3. Deploying the Flow: You can deploy your flow using the flow.deploy()
method or by defining deployment settings in a prefect.yaml
. This deployment will target your specific work pool without attempting to spin up new infrastructure. The deployment process involves specifying the flow's execution parameters and ensuring that it is registered with the Prefect server.
4. Entrypoint: If your flow code is already present in the Docker image used by your workers, you don't need to specify additional infrastructure configurations. Instead, focus on ensuring the entrypoint is correctly defined so that the flow can be executed by the worker.
For an example of deploying flows with Python, you can refer to this documentation.
By following these steps, you can maintain the separation between flow logic and execution infrastructure, allowing your pre-existing workers to execute flows without additional infrastructure management.Nate
02/28/2025, 11:42 PM.serve
a flow in a static container
• background tasks: closest ideologically to celery. you serve(*some_tasks)
on some static infra and then .delay(*inputs)
from somewhere else (like an endpoint where you dont wanna block) and the serve process is essentially akin to a redis consumer group so you can scale these horizontally as much as you wantNate
02/28/2025, 11:43 PMSamuel Schlesinger
02/28/2025, 11:46 PM.serve
is on a per-task basis, right – so we'd need at least one consumer explicitly started against each task to listen for incoming jobs? Our workers are generally fielding a wide variety of tasks, so this wouldn't scale especially well on our end. Background tasks look interesting, though – doing some reading there.Samuel Schlesinger
02/28/2025, 11:48 PMNate
02/28/2025, 11:59 PM.serve
is on a per-task basis, right
hmm not sure what you mean
to disambiguate (erring on verbose side):
• .serve
is a method on a flow that (when called) will make it a deployment polling for scheduled runs of itself, you can call any other python from inside your flow (including run_deployment
to trigger other remote work)
• there's a serve
utility for both flows and tasks, both are similar in that they take many functions and then listen for / execute scheduled runs of those functions
◦ for flows: serve
is literally just a bulk version of the .serve
method above
◦ for tasks: serve
is a utility that takes many tasks (making a static task registry type of thing) that a task worker will serve all together so it can subscribe to `.delay()`ed work from elsewhere. in contrast to work pool / worker polling, background tasks are websocket based. we specifically made these with horizontal scaling in mind
> We're operating in something of a monolith environment, where we scale our worker pools in aggregate, rather than with serverless processing like lambdas or ephemeral, event-based containers.
right so it sounds like you don't want workers or work pools because those are more for you want prefect to be dispatching ephemeral containers per flow run and stuff like that
so I would guess you'd want either
• flow(s) served in one or many static containers that can call whatever python you need to do
◦ advantage: you can schedule runs of these with event triggers
◦ disadvantage: uses polling, a little more annoying to scale horizontally bc each serve process wants a port
• background tasks that you can make N replicas of (and you're free to call flows / tasks inside of these too)
◦ advantage: horizontal scaling is just bumping replicas: n
, lower latency bc websockets
◦ disadvantage: can't use events to trigger runs (yet)
here's a pretty simple example of the background tasks stuffSamuel Schlesinger
03/01/2025, 12:09 AM.serve
starts an independent listener that polls for events for that task alone, right? We could either run one docker container per task definition, which would be a nightmare to scale on our end in Azure Container Apps on dedicated / reserved infrastructure, which we need to partition CPU/RAM to containers; or run multiple .serve
processes in each container, which means we'd need some degree of monitoring and crash handling et al.
This makes sense in a true serverless environment, but is a good margin more complicated than running a single worker that knows how to poll for all tasks through a shared entrypoint.Samuel Schlesinger
03/01/2025, 12:12 AMfor tasks:although maybe I'm misinterpreting, it sounds like theis a utility that takes many tasks (making a static task registry type of thing) that a task worker will serve all together so it can subscribe to `.delay()`ed work from elsewhere. in contrast to work pool / worker polling, background tasks are websocket based. we specifically made these with horizontal scaling in mindserve
serve
helper might be able to handle multiple tasks, rather than using the decorated my_specific_task.serve()
variantNate
03/01/2025, 12:12 AMalthough maybe I'm misinterpreting, it sounds like theyep exactlyhelper might be able to handle multiple tasks, rather than using the decoratedserve
variantmy_specific_task.serve()
Samuel Schlesinger
03/01/2025, 12:12 AMNate
03/01/2025, 12:14 AM.delay()
on the task objects to send them over
e.g. https://github.com/PrefectHQ/prefect-background-task-examples/blob/main/fastapi-user-signups/docker-compose.yaml#L31-L68Samuel Schlesinger
03/01/2025, 12:15 AM.delay()
on the very same task definitions. Nothing is serving tasks in the API nodes, so they won't try to process locally, but they'd get registered with the broker and that'd let the workers pick up the requests from the queue.Samuel Schlesinger
03/01/2025, 12:15 AMNate
03/01/2025, 12:16 AMNate
03/01/2025, 12:16 AMSamuel Schlesinger
03/01/2025, 12:29 AMtask.delay()
existed or that I could .serve
multiple tasks at once (read: the examples I saw used my_task.serve
, which mirrors my_task.deploy
);
2. A lot of the docs focus on flows, and since flows are a task container, I figured it made sense to start there, rather than with the lower-level entity;
3. The sole "static infrastructure" docker example is for running a single task baked into an image as the entrypoint itself, which scared me from a scaling standpoint;
4. Work pools appeared to make a lot of sense at first since I want to register my workers with the broker to consume tasks, so I went there next; and
5. The background tasks sound like they're just running as a background process, rather than something that can be orchestrated across remote consumers, so I discarded them.Samuel Schlesinger
03/01/2025, 12:30 AMserve
utility.Nate
03/01/2025, 12:34 AMSamuel Schlesinger
03/01/2025, 12:43 AMserve
utility from the command line as an entrypoint, where it'd do autodiscovery for all tasks registered with @task
(or @flow
?) and listen for them by default. Kicking off a specific python file per the example with python -m mymodule.tasks
feels a bit dirty. Unless prefect shell serve
can accomplish something along these lines?Nate
03/01/2025, 12:44 AM» prefect task serve --help
Usage: prefect task serve [OPTIONS] ENTRYPOINTS...
actually I'm not sure that's quite what you asked for, bc that'll point at specific function objects, instead of a module that might ahve a serve
util call
we could improve that! maybe like prefect task serve -m module.worker
Samuel Schlesinger
03/01/2025, 12:47 AMtask serve 1 2 3
, i.e. the terraform code that adds the ACA containers, which means both the infrastructure and the app code would have to be updated in lockstep – versus it just picking up anything registered in the module with the decorators.Samuel Schlesinger
03/01/2025, 12:47 AMNate
03/01/2025, 12:47 AMSamuel Schlesinger
03/01/2025, 12:51 AM.delay
accept work_pool_name
as a kwarg, and then nothing'd need to change on the user side; tasks could be deployed in a dynamic environment or delayed in a static one.Nate
03/01/2025, 12:53 AMSamuel Schlesinger
03/01/2025, 12:54 AMflow.delay
? 😅Nate
03/01/2025, 12:55 AMrun_deployment
from where they want to dispatchSamuel Schlesinger
03/01/2025, 12:57 AMSamuel Schlesinger
03/01/2025, 12:57 AMNate
03/01/2025, 12:58 AMSamuel Schlesinger
03/01/2025, 1:07 AM/var/www/.prefect/storage
as well (since I got a permissions error due to different users in the containers). Do all workers need to share this directory, and what actually goes in there; is it just a temp directory in case it's needed for file access et al?Samuel Schlesinger
03/01/2025, 1:15 AMPREFECT_RESULTS_PERSIST_BY_DEFAULT
and forget about it for now.Samuel Schlesinger
03/01/2025, 1:30 AMNate
03/01/2025, 1:31 AMSamuel Schlesinger
03/01/2025, 1:32 AMNate
03/01/2025, 1:32 AMNate
03/01/2025, 1:33 AMSamuel Schlesinger
03/01/2025, 1:33 AMSamuel Schlesinger
03/01/2025, 1:33 AMNate
03/01/2025, 1:34 AM