Tim Galvin
04/23/2024, 5:47 AM.map
of a task decorated function. I can easily batch up my large iterable, but I am more curious as to why the .map
might be falling over.Tim Galvin
04/23/2024, 6:09 AMpandas
dataframe, where teh iterable is dataframe.iterrows()
.Alec Thomson
04/23/2024, 6:19 AMall_things = []
for thing_type in ("foo", "bar", "baz"):
things_one = task_one.map(
row=dataframe.iterrows(),
thing_type=unmapped(thing_type),
)
things_two = task_two.map(
thing_one=things_one,
thing_type=unmapped(thing_type),
)
all_things.extend(things_two)
results = task_three.map(
thing_two=all_things,
)
concrete_results = [r.result() for r in results]
This caused the flow run to halt / freeze almost immediately (with eventual timeouts)Alec Thomson
04/23/2024, 6:20 AM# Attempted fix 1
_batch_size = 100
results = []
for i in range(0, len(dataframe), _batch_size):
sub_df = dataframe[i: i + _batch_size]
all_things = []
for thing_type in ("foo", "bar", "baz"):
things_one = task_one.map(
row=sub_df.iterrows(),
thing_type=unmapped(thing_type),
)
things_two = task_two.map(
thing_one=things_one,
thing_type=unmapped(thing_type),
)
all_things.extend(things_two)
sub_results = task_three.map(
thing_two=all_things,
)
results.extend(sub_results)
concrete_results = [r.result() for r in results]
This got more tasks submitted, but eventually still froze upAlec Thomson
04/23/2024, 6:21 AM# Attempted fix 2
_batch_size = 100
concrete_results = []
for i in range(0, len(dataframe), _batch_size):
sub_df = dataframe[i : i + _batch_size]
all_things = []
for thing_type in ("foo", "bar", "baz"):
things_one = task_one.map(
row=sub_df.iterrows(),
thing_type=unmapped(thing_type),
)
things_two = task_two.map(
thing_one=things_one,
thing_type=unmapped(thing_type),
)
all_things.extend(things_two)
sub_results = task_three.map(
thing_two=all_things,
)
concrete_results.extend([f.result() for f in sub_results])
Which is running, but seems much less efficient 😕Alec Thomson
04/23/2024, 6:23 AMAlec Thomson
04/23/2024, 7:12 AMAlec Thomson
04/23/2024, 8:50 AM.map
to .submit
within a for loop - like:
results = []
for row in dataframe.iterrows():
for thing_type in ("foo", "bar", "baz"):
thing_one = task_one.submit(
row=row,
thing_type=thing_type,
)
thing_two = task_two.submit(
thing_one=things_one,
thing_type=thing_type,
)
result = task_three.submit(
thing_two=thing_two,
)
results.append(result)
concrete_results = [r.result() for r in results]
And this seems to be progressing much better
It'd be great to understand why the .map
pattern was causing things to freeze upTim Galvin
04/23/2024, 8:52 AM.submit
version. The .map
version has completely borked itself.
Any ideas?