with prefect 3, is there a way to setup worker exe...
# ask-marvin
y
with prefect 3, is there a way to setup worker execution to avoid polling? e.g. something similar to celery/redis where events are pushed instead of polled? I'm curious about both the cloud and oss versions @Marvin
m
In Prefect 3, workers by default use a polling mechanism to retrieve scheduled runs from a work pool. This is different from a push-based system like Celery/Redis, where events are pushed to the workers. Prefect's architecture involves workers polling work pools for new runs, and although this is efficient for many use cases, it doesn't provide the same push-based event trigger inherently source. However, Prefect does offer some flexibility with event-driven capabilities through webhooks and triggers. You can create automations that respond to webhook events, which can trigger deployments based on certain events or conditions. This allows integration with external systems and can simulate some aspects of a push-based system by reacting to external events source. Unfortunately, there does not seem to be a direct native path in Prefect 3 to entirely replace the polling mechanism with a push-based system similar to Celery/Redis out-of-the-box for both the cloud and open-source versions. If you're looking for such an implementation, integrating Prefect with other systems using webhooks and event triggers might be a suitable workaround. For more details, you can refer to the Prefect 3 documentation about workers and event automations.
you can
serve
some tasks and then
.delay
them from elsewhere to run them its like redis streams or celery
y
that would just introduce more latency though.
n
how do you mean?
its generally faster than polling
y
i'm looking at a use-case of a user uploading a file, which triggers a background task worker to process the file, and then return that to the user ideally within seconds. we are currently using redis/celery. having low latency for spinning up the task is important.
n
yeah thats specifically a case where you poll the API for availability of results, in general background tasks dont have to do that
using websockets is faster than polling, and you can scale horizontally as much as you want without race conditions
y
i'm not so concerned with pushing the results of the processing. i'm more asking about the process of spinning up the workers. on this documentation it looks like the woker processes use polling for the execution? https://docs-3.prefect.io/3.0/deploy/infrastructure-concepts/workers#polling-for-work
n
right, so there are 2 distinct types of listeners in prefect • canonical workers poll a REST endpoint for flow runs of deployments (like you linked above) • task workers, like i linked, are websocket clients that are pushed task runs that are backgrounded somewhere else
y
the example you linked was sending a task to a redis client. Would the 'task worker' paradigm just be sending tasks to the existing redis setup we have?
n
the example was just writing some arbitrary result of running the task to redis, but you could replace that with writing back to slack, or writing to disk or wherever you want to send the result of running your task (if at all) redis isn’t necessarily involved unless you want to use it for your own reasons, like i did in that example
y
do you have more documentation about the task workers as compared to the default polling workers?
y
i see now, thank you!
👍 1
it seems like the ai is having a hard time with this newer feature
n
yeah i need to make the retrieval better at some point, it’s not amazing
y
before coming here i checked with perplexity.ai which had the same issue. perhaps a blog to writeup the content from the sample repo would help with retrieval
or maybe updating the 'about workers' page
@Nate thanks for your help, i was able to build a prototype of prefect in our app using the background tasks examples. I'm now looking into A) monitoring the task workers B) organizing 'real' code into flows
For A), the prefect ui is a good dashboard to see the tasks. That said, to see the status of a worker it looks like there's just a simple endpoint right now that can return json, within each individual worker, and not an aggregate view. As defined here: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_worker.py The 'worker pool' concept doesn't seem to apply to the task workers. Am I reading this correctly?
n
yes that sounds right!
y
so, if we wanted a dashboard to monitor the health of each individual worker, we would need to do the aggregation on our side to build a ui that queries the status of each running worker at that endpoint?
(or just look at the infrastructure container logs themselves). I'm used to flower from celery, which was a screenshot in your blog post announcing the task workers
n
yeah we dont have a ton of observability for task workers themselves at this time unfortunately. you'd just see all the task runs in the UI that get picked up by them
y
And for B), is it correct that right now the background tasks only works for individual tasks, and not Flows? I ask because in our situation what we are looking for most is a way to organize complex pipeline processing steps in near-real time (triggered when a user clicks a button in a web ui), so I'm hoping to use the DAG concept of flows and tasks, but without relying on a polling scheduler that would introduce latency
I realize i could have a flow call background tasks just as they would call any other task. That said: can the flows be triggered with the event-based websocket system, or do the flows need to be triggered by a polling scheduler? Or, does this distinction not matter due to the way the prefect internals work; is a 'flow' just an organization of tasks, such that, if all the tasks in a flow were event-based, starting the flow itself would also be event-based with the same latency concerns as its individual tasks?
n
let me give this question proper attention tomorrow, this is something i’ve been thinking a lot about recently but i’m tied up at an event rn.
y
thanks. Would be helpful to hear your thoughts @Nateon whether the background task workers can be used with flows, or otherwise how best to solve for a DAG within event-based triggers from users at low latency
should I assume that it is the case that the background tasks will only work for handling small tasks and not for a whole DAG-like flow, in the current production version of prefect 3 as of today?
n
so what do you think about something like this
Copy code
@flow
def lots_of_nested_tasks_and_flows():
   pass

@task
def calls_a_bunch_of_things():
   lots_of_nested_tasks_and_flows() # blocking, does all the side effects

calls_a_bunch_of_things.serve() # websockets, low latency
should I assume that it is the case that the background tasks will only work for handling small tasks and not for a whole DAG-like flow, in the current production version of prefect 3 as of today?
background tasks can call whatever, including flows
y
if a background task calls a flow, would the flow wait to start until the polling scheduler finds it? or would the flow start immediately when called?
n
if you just call a flow it runs like a normal python function, no scheduler involved
its only if you hit
POST /create_flow_run_from_deployment
one way or another are you scheduling a run that a polling process would have to catch
y
okay that clarifies things. so if we avoid using the 'deployment' parts of the framework and just call flows from task workers we avoid the polling scheduler?
n
yes
yeah to be candid we have this problem in our info architecture where we suggest that Deployment is the "one true path" but there's a lot of use you can get by avoiding Deployment altogether and using flows for observability, you still get events and the runs in the UI and everything else, just not dynamic dispatch of infra
for example you can do this in a lambda
Copy code
@flow
def handler(event, context): ...
y
ya i think in our use case we could refactor things to run serverless, but currently we have a autoscaling number of worker containers that are created by celery/redis/ecs cdk setup. so i'm for starters seeing if we could create a similar number of prefect task workers that execute the processing. it looks like there's some automatic routing logic to handle dispatches to multiple workers in the task_worker source code. how would that interact with a really complex flow? is there for example any way to trace and monitor which workers are doing what within a flow? so if something fails we can look at the logs for that specific worker?
"yeah to be candid we have this problem in our info architecture where we suggest that Deployment is the "one true path" but there's a lot of use you can get by avoiding Deployment altogether and using flows for observability, you still get events and the runs in the UI and everything else, just not dynamic dispatch of infra (edited)" Ya I think this is the root of my confusion
n
thats a really good question can you open a discussion about this? right now internally we're working on emitting more context about the client side exec env so the UI can show it, but its deployment-focused AFAIK right now but it would be ideal to do the same for task workers so that the flow runs you call from background would be traceable to the infra that they're running on
y
the othe rproblem with serverless is latency; a lot of serverless thing shav ecold starts an dit can be complex to pre-warm them. the aws ecs guidance is pre-warming still leaves several seconds for warm starts
so we ruled out serverless in the past for that reason for our own use-case. i think technically there's arguments that can work around the cold-start problem but most o fthose solutions are more time consuming to setup and maintain than the' traditional' worker container architecture
"but it would be ideal to do the same for task workers so that the flow runs you call from background would be traceable to the infra that they're running on" So for now if we implemented prefect we would get • observability + monitoring of flow logic via tasks (DAG-like) but we would not get • observability about which worker specifically failed when something went wrong • and we wouldn't have any fine-grained control over how the tasks are being delegated to which workers? For example for some processing tasks we might want certain workers to do it that have certain ram/cpu/gpu specs, instead of treating all workers as interchangeable.
we could see the worker logs themselves via ecs though if we were to scale to 100 workers it could be difficult to know which one to examine
okay i posted a discussion
🙏 1
a
Hey! Like Nate, I work at Prefect> 👋 I've worked directly on some of things you're trying out, so I'm excited to see the feedback. We don't currently have a worker type that starts as a long-running process and receives work instead of polling for it (other than serverless work pools, which, as you mentioned, incur some latency). Task workers are the fastest way to have a process alive and ready for work that receive work over websockets. However, as you mentioned, these run discrete tasks rather than DAGs/workflows ("flows" in our terminology). We are still evaluating whether and how we'd carry that websockets approach over to workers generally. We are actively improving the observability of both flow and task workers. This week, we're launching improvements to worker observability (in our Cloud product first) that include showing worker logs. _O_ur first pass of this is for the Cloud UI and only for workers that run flows, not task workers or serverless workers, but those workers (and OSS UI support) are all on our roadmap. Anyway, I agree with you that it would be best is if both task and flow workers could start and receive work pushed over websocket connections. All of the pieces are there, so I hope we deliver that for you and other interested users. 🙂 In the meantime, you can also experiment with starting a long-running process that listens for scheduled flow run events, something like this (with Prefect 3):
Copy code
async def watch_events():
    async with get_events_subscriber(
        filter=EventFilter(
            event=EventNameFilter(
                name=[event="prefect.flow-run.scheduled"],
            ),
            resource=EventResourceFilter(
                labels={"prefect.resource.name": "your-flow-name"}
        )
    ) as events_subscriber:
        async for event in events_subscriber:
            # You'd have to retrieve the flow
            print(event.model_dump_json(indent=2))
I didn't test that code, but it should be close. You'd need to get the flow run ID from the event and then read the flow run data from our API to resolve the parameters to arguments (linked an example of how we do this when we're actually running flows). Anyway, not a solution but it's on my mind!
y
Thanks. looking into this some more, it seems like the prefect task worker paradigm is a good step forwards but would be a bit of a regression from our current celery+redis worker setup for monitoring and managing the workers themselves. @Nate when you talk about using prefect for just the task+flow monitoring and not the deployments, is the implication that we could keep our existing celery setup and just wrap things in prefect decorators to log the stages of the pipeline? So something like
Copy code
@task # prefect decorator
@app.task # launch task via celery
def process document():
    do a bunch of things 
    return results
I am thinking about trying that approach though I'm a little concerned about mismatch between celery + prefect for example with concurrency
and I'm not sure how much benefit we would get if we're only using prefect for visualizing tasks versus controlling the execution -- if that is even possible to setup
a
We need to give you more visibility into workers and worker logs for sure