Sam Cook
12/01/2022, 8:25 PMmap
to pass those items/results to follow on tasks. Consistently, if I have a large number of items (50+) then the entire process will lock up during the follow on tasks and fail to terminate cleanly. In the UI the root job is marked as crashed
but the tasks are always stuck in a running state. I'm running in a Kubernetes environment as well so the stuck jobs have to be manually cleaned up whenever this occurs as they never reach completion.
I think this might possibly be related to @Boggdan Barrientos question from yesterday.from prefect import flow, task
@task
def process1(event):
return event
@task
def process2(event):
return event
@task
def process3(event):
return event
@task
def process4(event):
return event
@task
def example():
event = {"debug": "ignore"}
return [event for i in range(100)]
@flow
def my_flow():
result = example.submit()
res1 = process1.map(result)
res2 = process2.map(res1)
res3 = process3.map(res2)
res4 = process4.map(res3)
return
my_flow()
res1 = [res.result() for res in res1]
Zanie
Sam Cook
12/01/2022, 9:00 PM