https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Sam Cook

12/01/2022, 8:25 PM
Is there a most correct way to pass mapped results between tasks in a flow? I have a flow that generates a list of items and then repeatedly calls
map
to pass those items/results to follow on tasks. Consistently, if I have a large number of items (50+) then the entire process will lock up during the follow on tasks and fail to terminate cleanly. In the UI the root job is marked as
crashed
but the tasks are always stuck in a running state. I'm running in a Kubernetes environment as well so the stuck jobs have to be manually cleaned up whenever this occurs as they never reach completion. I think this might possibly be related to @Boggdan Barrientos question from yesterday.
👀 1
✅ 1
Here's an example flow that causes a lock in my environment
Copy code
from prefect import flow, task

@task
def process1(event):
    return event

@task
def process2(event):
    return event

@task
def process3(event):
    return event

@task
def process4(event):
    return event

@task
def example():
    event = {"debug": "ignore"}
    return [event for i in range(100)]

@flow
def my_flow():
    result = example.submit()
    res1 = process1.map(result)
    res2 = process2.map(res1)
    res3 = process3.map(res2)
    res4 = process4.map(res3)
    return

my_flow()
Also, if I manually iterate over results before passing the results to the next map everything works as expected.
res1 = [res.result() for res in res1]
z

Zanie

12/01/2022, 8:53 PM
Mapping with a bunch of data passing like that will deadlock right now. I’m working on fixing this ASAP 🙂
gratitude thank you 2
🙌 3
Sorry you’ve run into it 😞
s

Sam Cook

12/01/2022, 9:00 PM
Glad to have an answer that's not my fault at least 😆