Hey folks, I'm looking into migrating a django + c...
# ask-community
s
Hey folks, I'm looking into migrating a django + celery app to use Prefect for async task processing – we run X celery workers, which all listen to a queue broker (redis/rabbitmq/etc), and our API adds jobs to the queue as users trigger events. Pretty standard stuff. I'm trying to recreate a basic version of this in Prefect. I've got a prefect server running; and several workers running in docker instances, each joining a work pool. The workers have the application code baked into the image. In celery-land, I'd just trigger jobs by calling
my_decorated_func.apply_async([args])
, i.e. `say_hello_world.apply_async(["Marvin"])`; and the workers would pick up the jobs, set up app internals (environment config et al), and then run the decorated function automatically. I'm not seeing an obvious way to do this with Prefect. I can call my
say_hello_world
flow directly, and it'll run locally, but I need it to run in the worker pool. Calling
.deploy()
tries to register it with the default worker pool, which is great, but it complains about needing an entrypoint or image. I saw some comments online about using 'local storage' to point to the specific file the flow is in, i.e.
/path/to/file/flow.py:say_hello_world
, but... there's no way that's the "right" way to queue a job, right? I get that the Prefect control plane allows for total independence between the place that's queueing jobs and the place that's executing them, but in my case, they're both the same docker image; just with different entrypoints (starting the API vs starting the prefect workers). What's a clean way to just say "look for this exact same decorated function in the worker", essentially as if it were running locally but in a different container? CC @Marvin
m
Migrating from Celery to Prefect for async task processing involves a few key steps and understanding of Prefect's architecture. Here's a concise breakdown of how you can achieve this: 1. Work Pools and Workers: In Prefect 3.x, workers poll work pools for new runs to execute. Your workers are lightweight services that will retrieve scheduled runs from a work pool and execute them. This is akin to Celery workers listening to a queue broker. More details can be found here. 2. Flow Deployment: Prefect requires you to deploy flows in a way that the execution environment can access the flow code. You can use Docker image-based storage, which seems to align with your use case where the application code is already baked into the Docker image. This means your flow can be executed in a similar environment as it was developed. Check out the flow code storage options. 3. Entrypoint and Image Requirements: When deploying, you need to specify an entrypoint and image if you're using Docker. This is necessary for Prefect to know how to execute your flow in the worker pool. You can manage these configurations using a
prefect.yaml
file. More on this can be found here. 4. Local Storage: While local storage might seem like a hack, it can be a legitimate option if your workers and deployment environment are tightly coupled and local execution is viable. In summary, to mimic Celery's behavior, ensure your flows are properly deployed with the necessary entrypoints and images, and configure your workers to poll the appropriate work pools. This way, your flows can be executed in the worker pool environment, similar to how Celery workers process tasks from a queue.
s
@Marvin Following up on your comment above. Our infrastructure is responsible for spinning up and managing our various containers, so I don't want Prefect to try to create new resources / download images / et cetera if I try to deploy a flow with "docker" storage. If we can assume the image has the code in a specific location, and is already running as one or more workers joined to a Prefect work pool, what's the correct syntax for just kicking off the flow in the worker pool? It really feels like the flow/task shouldn't have to be aware of the infrastructure that will execute it later.
m
To deploy and run a flow in Prefect without dynamic infrastructure management, especially when you have a fixed environment with pre-existing workers, you can follow these steps: 1. Deployment without Infrastructure Management: Prefect allows you to deploy flows using the Python SDK or a
prefect.yaml
file without managing the infrastructure. Since your workers are already running and joined to a work pool, you can focus on targeting these existing resources. 2. Use of Work Pools: Ensure that your workers are polling the correct work pool. Each worker type corresponds to a specific execution environment, and they will execute flows assigned to their respective work pool. This setup ensures that the flow will be executed in the intended environment. 3. Deploying the Flow: You can deploy your flow using the
flow.deploy()
method or by defining deployment settings in a
prefect.yaml
. This deployment will target your specific work pool without attempting to spin up new infrastructure. The deployment process involves specifying the flow's execution parameters and ensuring that it is registered with the Prefect server. 4. Entrypoint: If your flow code is already present in the Docker image used by your workers, you don't need to specify additional infrastructure configurations. Instead, focus on ensuring the entrypoint is correctly defined so that the flow can be executed by the worker. For an example of deploying flows with Python, you can refer to this documentation. By following these steps, you can maintain the separation between flow logic and execution infrastructure, allowing your pre-existing workers to execute flows without additional infrastructure management.
n
hi @Samuel Schlesinger! full disclaimer I didn't closely read this whole thread but I'll highlight a couple common paradigms • workers / work pools: if you need dynamic dispatch of infra (container per flow run), which sounds like maybe not your use case here •
.serve
a flow in a static containerbackground tasks: closest ideologically to celery. you
serve(*some_tasks)
on some static infra and then
.delay(*inputs)
from somewhere else (like an endpoint where you dont wanna block) and the serve process is essentially akin to a redis consumer group so you can scale these horizontally as much as you want
happy to elaborate on any of those if they sound like what you're looking for
s
@Nate Thanks for the quick reply.
.serve
is on a per-task basis, right – so we'd need at least one consumer explicitly started against each task to listen for incoming jobs? Our workers are generally fielding a wide variety of tasks, so this wouldn't scale especially well on our end. Background tasks look interesting, though – doing some reading there.
We're operating in something of a monolith environment, where we scale our worker pools in aggregate, rather than with serverless processing like lambdas or ephemeral, event-based containers.
n
>
.serve
is on a per-task basis, right hmm not sure what you mean to disambiguate (erring on verbose side): •
.serve
is a method on a flow that (when called) will make it a deployment polling for scheduled runs of itself, you can call any other python from inside your flow (including
run_deployment
to trigger other remote work) • there's a
serve
utility for both flows and tasks, both are similar in that they take many functions and then listen for / execute scheduled runs of those functions ◦ for flows:
serve
is literally just a bulk version of the
.serve
method above ◦ for tasks:
serve
is a utility that takes many tasks (making a static task registry type of thing) that a task worker will serve all together so it can subscribe to `.delay()`ed work from elsewhere. in contrast to work pool / worker polling, background tasks are websocket based. we specifically made these with horizontal scaling in mind > We're operating in something of a monolith environment, where we scale our worker pools in aggregate, rather than with serverless processing like lambdas or ephemeral, event-based containers. right so it sounds like you don't want workers or work pools because those are more for you want prefect to be dispatching ephemeral containers per flow run and stuff like that so I would guess you'd want either • flow(s) served in one or many static containers that can call whatever python you need to do ◦ advantage: you can schedule runs of these with event triggers ◦ disadvantage: uses polling, a little more annoying to scale horizontally bc each serve process wants a port • background tasks that you can make N replicas of (and you're free to call flows / tasks inside of these too) ◦ advantage: horizontal scaling is just bumping
replicas: n
, lower latency bc websockets ◦ disadvantage: can't use events to trigger runs (yet) here's a pretty simple example of the background tasks stuff
s
@Nate Each call to
.serve
starts an independent listener that polls for events for that task alone, right? We could either run one docker container per task definition, which would be a nightmare to scale on our end in Azure Container Apps on dedicated / reserved infrastructure, which we need to partition CPU/RAM to containers; or run multiple
.serve
processes in each container, which means we'd need some degree of monitoring and crash handling et al. This makes sense in a true serverless environment, but is a good margin more complicated than running a single worker that knows how to poll for all tasks through a shared entrypoint.
for tasks:
serve
is a utility that takes many tasks (making a static task registry type of thing) that a task worker will serve all together so it can subscribe to `.delay()`ed work from elsewhere. in contrast to work pool / worker polling, background tasks are websocket based. we specifically made these with horizontal scaling in mind
although maybe I'm misinterpreting, it sounds like the
serve
helper might be able to handle multiple tasks, rather than using the decorated
my_specific_task.serve()
variant
n
although maybe I'm misinterpreting, it sounds like the
serve
helper might be able to handle multiple tasks, rather than using the decorated
my_specific_task.serve()
variant
yep exactly
s
can `.serve()`'d tasks run elsewhere – i.e. rather than a background process in our API containers, run in a dedicated set of worker containers?
n
yes! you could have some containers that are just listening for / executing tasks and then elsewhere your API is just calling
.delay()
on the task objects to send them over e.g. https://github.com/PrefectHQ/prefect-background-task-examples/blob/main/fastapi-user-signups/docker-compose.yaml#L31-L68
s
Okay, makes sense. So... we run X docker containers that each serve all of the task definitions, and these'll get registered with the prefect broker (the container that serves the dashboard etc), and our unrelated API containers can call
.delay()
on the very same task definitions. Nothing is serving tasks in the API nodes, so they won't try to process locally, but they'd get registered with the broker and that'd let the workers pick up the requests from the queue.
Does that sound about right?
n
sounds exactly right
if you go down this path i'd be eager to hear what's confusing / non-ideal about the DX!
s
Definitely think there's a missing link somewhere: 1. I had no clue
task.delay()
existed or that I could
.serve
multiple tasks at once (read: the examples I saw used
my_task.serve
, which mirrors
my_task.deploy
); 2. A lot of the docs focus on flows, and since flows are a task container, I figured it made sense to start there, rather than with the lower-level entity; 3. The sole "static infrastructure" docker example is for running a single task baked into an image as the entrypoint itself, which scared me from a scaling standpoint; 4. Work pools appeared to make a lot of sense at first since I want to register my workers with the broker to consume tasks, so I went there next; and 5. The background tasks sound like they're just running as a background process, rather than something that can be orchestrated across remote consumers, so I discarded them.
Some of this is prolly from my overlooking key elements, but I definitely read through a lot of documentation before posting here. Probably can be sorted by just adding another static infrastructure example for the more generalized
serve
utility.
n
thank you-that’s great feedback! i think you’re definitely right there’s a blind spot in the docs for background tasks, the page we have now is just an intro and we probably don’t clearly differentiate between the convenience methods on the single object vs the utilities. be on the lookout for some updated docs soon!
❤️ 1
s
@Nate It'd also be swell to be able to call the
serve
utility from the command line as an entrypoint, where it'd do autodiscovery for all tasks registered with
@task
(or
@flow
?) and listen for them by default. Kicking off a specific python file per the example with
python -m mymodule.tasks
feels a bit dirty. Unless
prefect shell serve
can accomplish something along these lines?
n
i think we should have you covered there, but lmk if you encounter weirdness
Copy code
» prefect task serve --help

 Usage: prefect task serve [OPTIONS] ENTRYPOINTS...
actually I'm not sure that's quite what you asked for, bc that'll point at specific function objects, instead of a module that might ahve a
serve
util call we could improve that! maybe like
prefect task serve -m module.worker
s
Exactly my thought – as is, the infra would have to be aware of the task definitions re:
task serve 1 2 3
, i.e. the terraform code that adds the ACA containers, which means both the infrastructure and the app code would have to be updated in lockstep – versus it just picking up anything registered in the module with the decorators.
That would be super cool.
n
great suggestion! keep them coming if you think of any more
s
@Nate I guess as a footnote, an alternative vehicle here could be to keep pool workers as is, but let
.delay
accept
work_pool_name
as a kwarg, and then nothing'd need to change on the user side; tasks could be deployed in a dynamic environment or delayed in a static one.
n
ooh so there's a bit of a incongruence there though, pool workers poll for flow runs. there may be a day where we unify this but today work pool workers exclusively poll for flow runs
s
flow.delay
? 😅
n
we have some experimental work that you might want to keep an eye on, but today people do this by defining a new deployment with a work pool and then calling
run_deployment
from where they want to dispatch
s
haha, makes sense; before I reached out here, I had registered a deployment with local storage (which was clunky re: needing to reference python file paths) and saw that I could trigger it with `run_deployment`; but the two-step process made me realize I was probably misusing prefect and came here instead
👍 1
Anyways, thanks for all the support here!
n
sure thing! catjam
s
Dumb question, how should I think about storage for these `.delay`'d task runs? I'd expect Prefect to maintain its state in postgres, which I linked up; but it seems like it's actively trying to use
/var/www/.prefect/storage
as well (since I got a permissions error due to different users in the containers). Do all workers need to share this directory, and what actually goes in there; is it just a temp directory in case it's needed for file access et al?
Found the docs on task caching, seems like I can probably just set
PREFECT_RESULTS_PERSIST_BY_DEFAULT
and forget about it for now.
Separately, are background tasks kind of not-recommended at this stage? I got everything wired up in a POC and it works – hooray, tasks are kicking off – but I don't see any of the reporting in the dashboard, outside of the event feed. Seems like the "Runs" section is only for flow executions.
n
sorry afk can give better answer later but tldr is that (as of now) task run params aren’t persisted in the db (in any context). that shared disk is for parameter storage. we see that as not ideal so it may change in the future. in practice rn, i like an s3 bucket (or similar) with a path for results and one for params wrt the other question they’re newer than the whole flow run / worker paradigm but they should be gtg, the runs page should have a task runs tab
s
The task runs tab is empty interestingly, despite clicking an event in the feed sending me there
n
oh! is there anything strange in the console about a pagination endpoint? this might be a known UI bug
sorry about that!
s
oh yup it's throwing lots of exceptions, but not about pagination
let me switch accounts and I'll share a trace
n
please put any details here! i’ll be back later to take a look