example, I have a task that submits about 200 subt...
# ask-community
m
example, I have a task that submits about 200 subtasks. You can see there is about 1 minute 30 seconds latency before any of them start
n
can you show what your code looks like? it likely has to do with the tasks you're submitting
Copy code
In [1]: from prefect import flow, task

In [2]: @flow
   ...: def f(): task(lambda x: x +1).map(range(200)).result()

In [3]: f()
since this takes about 3 seconds to finish in prefect 3.x
n
m
can you elaborate please?
18 seconds
i think that should be representative of what you're doing
essentially i was just saying that bc of how you were checking for completion, you were blocking
as_completed
should help with that
m
still not 100% sure I understand. certainly the lines I linked are blocking. But the task that is waiting on them is itself submitted as an aysnc task using
task.submit
Are you saying that it's still blocking?
like it blocks the main thread?
n
https://github.com/MCGallaspy/pokemon_showdown_replay_tools/blob/main/scripts/populate.py#L161C9-L161C22 yeah my assumption in looking at the code was that this
future.wait()
was blocking the main thread for each future, I could be wrong
m
ok, I think I understand. ty for the help and the great gist
n
šŸ‘ feel free to post back here if you're still seeing delays
n
hrmmmmm yes. but once a future comes out of
as_completed
you know its done so the
future.result()
call which would normally be blocking should happen instantly but overall yeah the list comp would be blocking bc I'm exhausting
as_completed
instead of doing something
for
each completed future that pops out
i could be missing something about your intention w whats going on, i sort of whipped that together quickly if you have a minimal example of where concurrency via submit / map or as_completed is behaving in an unexpected manner, I'd be happy to take a look or you can create a discussion so other folks can benefit from the convo
m
I'm just trying to understand it myself, I don't have a minimal example
Certainly I believe that your gist has very little latency, so I'm trying to find the crucial difference between mine and yours
It's a learning thing
n
maybe

this is helpfulā–¾

šŸ™Œ 1
m
I ran your script without modification and I'm still seeing big latency. I'm assuming it's a me issue at this point. I wonder if there's any way to profile the prefect internals?
Screenshot 2024-11-21 145350.png
n
hrm what does
prefect config view
say? i.e. are you running against an ephemeral server, oss server, or cloud?
m
Copy code
šŸš€ you are connected to:
<http://127.0.0.1:4200>
PREFECT_PROFILE='local'
PREFECT_API_URL='<http://127.0.0.1:4200/api>' (from profile)
n
looks like an open source server. so you have
prefect server start
going someplace?
m
yup
n
hm. and the script you copied w/o modification, thats the gist I shared?
m
yeah
n
huh - off the top of my head im not sure is the delay only in the resulting timeline in the UI or do you see the work literally delayed in your terminal?
m
it's literally delayed
could it be related to task caching?
I think I'm focusing on the wrong thing here. Let me clarify my intent. Is there an idiomatic way to write a consumer-producer pattern in prefect? In my example, results from the search api produce replays ids, that I then want to consume to fetch a remote database row and persist it to disk. It is much faster to get replay ids than it is to download the corresponding data. But I ideally want the production of replay ids to execute concurrently with the consumption of them.
both the producer and consumer processes may be very long lived
n
im tempted to point you here (a relatively new pattern in prefect), but im not sure how literally you mean consumer/producer what im sharing above is akin to redis streams / celery, where you have • finite set of tasks you serve (this is a websocket client that gets pushed task runs from the server) i.e.
serve(*many_tasks)
ā—¦ you can horizontally scale these (e.g. N pods) arbitrarily without race conditions because of consumer groups ā—¦ all the task features apply, i.e caching, results, retries etc • from somewhere like a webapp, you can
some_task.delay(**task_kwargs)
to "background" that task without blocking so this is good for cases where you want to offload a bunch of work to happen concurrently somewhere on static infra, but the caller (or delay-er) doesn't need the result of that background task am I going off the rails here or does that sound like something you're interested in?
m
you're not going off the rails
we're thinking along the same lines
I think that I literally mean consumer/producer, but I guess I'm not sure what a metaphorical consumer/producer pattern is
n
what a metaphorical consumer/producer pattern is
haha fair enough. well cool, those examples are almost all docker-compose and should be mostly up to date, lmk if you have any specific questions