Tim-Oliver
12/17/2022, 3:28 PMTim-Oliver
12/17/2022, 3:29 PMimport numpy as np
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect_dask import DaskTaskRunner
@task(cache_key_fn=task_input_hash)
def fun_1(n):
return n ** 2
@task(cache_key_fn=task_input_hash)
def fun_2(n):
return 2 * n + 1
@task(cache_key_fn=task_input_hash)
def fun_3(n, m):
return n + m
@flow(name="map-bug",
cache_result_in_memory=False,
persist_result=True,
result_storage="local-file-system/buchtimo-laptop-storage",
task_runner=DaskTaskRunner())
def map_bug_flow():
data = np.arange(200)
f1 = fun_1.map(data)
f2 = fun_2.map(f1)
f3 = fun_3.map(data, f2)
if __name__ == "__main__":
map_bug_flow()
Tim-Oliver
12/17/2022, 3:29 PMimport numpy as np
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect_dask import DaskTaskRunner
@task(cache_key_fn=task_input_hash)
def fun_1(n):
return n ** 2
@task(cache_key_fn=task_input_hash)
def fun_2(n):
return 2 * n + 1
@task(cache_key_fn=task_input_hash)
def fun_3(n, m):
return n + m
@flow(name="map-bug",
cache_result_in_memory=False,
persist_result=True,
result_storage="local-file-system/buchtimo-laptop-storage",
task_runner=DaskTaskRunner())
def map_bug_flow():
data = np.arange(200)
for d in data:
f1 = fun_1.submit(d)
f2 = fun_2.submit(f1)
f3 = fun_3.submit(d, f2)
if __name__ == "__main__":
map_bug_flow()
it works.Tim-Oliver
12/17/2022, 3:31 PMTim-Oliver
12/17/2022, 3:31 PMTim-Oliver
12/17/2022, 3:32 PMAnna Geller
Tim-Oliver
12/17/2022, 4:22 PMTim-Oliver
12/17/2022, 4:23 PMTim-Oliver
12/17/2022, 4:24 PMTim-Oliver
12/17/2022, 4:25 PMdata = list(range(200))
results in the same behaviour.Anna Geller
Tim-Oliver
12/18/2022, 5:15 PM