Jeff Yun
10/02/2019, 8:40 PMChris White
10/03/2019, 6:18 PMJeff Yun
10/03/2019, 10:11 PMimport dask.bag as db
@task(name='create_product')
def create_product(params1, params2):
return list(itertools.product(params1, params2))
@task(name='make_batch')
def make_batch(param, MAX_PARTITIONS=200):
param = db.from_sequence(param, npartitions=MAX_PARTITIONS)
return param
In my flow, I have
prod = create_product(a_list, b_list)
sim = stage_0_func.map(unmapped(cfgdir), unmapped(tmpdir), make_batch(prod))
which gives me the error
Traceback (most recent call last):
File "REPODIR/demos/prefect/engine/task_runner.py", line 260, in run
executor=executor,
File "REPODIR/demos/prefect/engine/task_runner.py", line 720, in run_mapped_task
upstream_state.result[i],
TypeError: 'Bag' object does not support indexing
Chris White
10/03/2019, 10:14 PMJeff Yun
10/03/2019, 10:20 PMChris White
10/03/2019, 10:27 PMJeff Yun
10/03/2019, 10:30 PMChris White
10/03/2019, 10:31 PMJeff Yun
10/03/2019, 10:31 PMChris White
10/03/2019, 10:32 PMJeff Yun
10/03/2019, 10:39 PMChris White
10/03/2019, 11:02 PMJeff Yun
10/03/2019, 11:08 PMChris White
10/03/2019, 11:09 PMJeff Yun
10/03/2019, 11:11 PMChris White
10/03/2019, 11:12 PM@task
def my_task():
return "my_value" # <-- this is the return value I'm referring to
Jeff Yun
10/03/2019, 11:12 PMDask bag
to disk instead of passing it. Interesting!Chris White
10/03/2019, 11:12 PMJeff Yun
10/03/2019, 11:13 PMChris White
10/03/2019, 11:14 PMJeff Yun
10/03/2019, 11:16 PMmap_states = executor.map(
run_fn, initial_states, range(len(map_upstream_states)), map_upstream_states
)
Since in task_runner.py, initial_states
and map_upstream_states
have length as large as the mapped arg's, even though their only difference is different argumentsChris White
10/03/2019, 11:20 PM