mondras
07/18/2023, 7:52 PMunmapped
in prefect < 2.0 and prefect 2.0?Nate
07/18/2023, 8:23 PMMarvin
07/18/2023, 8:23 PMNate
07/18/2023, 8:24 PMunmapped
feature in prefect 2 that's used with .map
on tasks?Marvin
07/18/2023, 8:24 PMunmapped
feature in Prefect 2 that's used with .map
on tasks. It's highly depressing, almost like my existence, but don't worry, I'll keep looking. You might want to check out the Prefect documentation or their official forum for more specific information.Nate
07/18/2023, 8:29 PMunmapped
works essentially the same way. for example
In [1]: from prefect import task, flow, unmapped
In [2]: @task
...: def process_item(item: str, unmapped_args: list[str]):
...: print(item, unmapped_args)
...:
In [3]: @flow(log_prints=True)
...: def my_flow():
...: process_item.map(["foo", "bar"], unmapped_args=unmapped([1, 2, 3]))
...:
In [4]: my_flow()
15:28:57.985 | INFO | prefect.engine - Created flow run 'athletic-kestrel' for flow 'my-flow'
15:28:58.605 | INFO | Flow run 'athletic-kestrel' - Created task run 'process_item-1' for task 'process_item'
15:28:58.607 | INFO | Flow run 'athletic-kestrel' - Submitted task run 'process_item-1' for execution.
15:28:58.615 | INFO | Flow run 'athletic-kestrel' - Created task run 'process_item-0' for task 'process_item'
15:28:58.615 | INFO | Flow run 'athletic-kestrel' - Submitted task run 'process_item-0' for execution.
15:28:59.232 | INFO | Task run 'process_item-0' - foo [1, 2, 3]
15:28:59.338 | INFO | Task run 'process_item-0' - Finished in state Completed()
15:28:59.679 | INFO | Task run 'process_item-1' - bar [1, 2, 3]
15:28:59.807 | INFO | Task run 'process_item-1' - Finished in state Completed()
15:28:59.924 | INFO | Flow run 'athletic-kestrel' - Finished in state Completed('All states completed.')
mondras
07/18/2023, 8:44 PMsubmit()
?Nate
07/18/2023, 8:44 PMNate
07/18/2023, 8:44 PMmondras
07/18/2023, 8:47 PMdasktaskrunner
, the docs say to use .submit
, but i don't have to do that with .map
? it will call .submit
for me? what about the .results
; ty btwNate
07/18/2023, 8:52 PMi don't have to do that withcorrect, for example?.map
In [1]: from prefect import task, flow, unmapped
In [2]: from prefect_dask import DaskTaskRunner
In [3]: @task
...: def process_item(item: str, unmapped_args: list[str]):
...: print(item, unmapped_args)
...:
In [4]: @flow(log_prints=True, task_runner=DaskTaskRunner)
...: def my_flow():
...: process_item.map(["foo", "bar"], unmapped_args=unmapped([1, 2, 3]))
...:
In [5]: my_flow()
15:51:02.077 | INFO | prefect.engine - Created flow run 'colossal-salmon' for flow 'my-flow'
15:51:02.082 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
15:51:03.091 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status>
15:51:03.731 | INFO | Flow run 'colossal-salmon' - Created task run 'process_item-0' for task 'process_item'
15:51:04.043 | INFO | Flow run 'colossal-salmon' - Submitted task run 'process_item-0' for execution.
15:51:04.052 | INFO | Flow run 'colossal-salmon' - Created task run 'process_item-1' for task 'process_item'
15:51:04.058 | INFO | Flow run 'colossal-salmon' - Submitted task run 'process_item-1' for execution.
15:51:05.754 | INFO | Task run 'process_item-0' - foo [1, 2, 3]
15:51:05.875 | INFO | Task run 'process_item-0' - Finished in state Completed()
15:51:05.922 | INFO | Task run 'process_item-1' - bar [1, 2, 3]
15:51:06.170 | INFO | Task run 'process_item-1' - Finished in state Completed()
15:51:09.096 | INFO | Flow run 'colossal-salmon' - Finished in state Completed('All states completed.')
Nate
07/18/2023, 8:52 PM15:51:03.731 | INFO | Flow run 'colossal-salmon' - Created task run 'process_item-0' for task 'process_item'
15:51:04.043 | INFO | Flow run 'colossal-salmon' - Submitted task run 'process_item-0' for execution.
15:51:04.052 | INFO | Flow run 'colossal-salmon' - Created task run 'process_item-1' for task 'process_item'
15:51:04.058 | INFO | Flow run 'colossal-salmon' - Submitted task run 'process_item-1' for execution.
mondras
07/18/2023, 8:53 PMNate
07/18/2023, 8:54 PMresults = process_item.map(["foo", "bar"], unmapped_args=unmapped([1, 2, 3]))
print([result.result() for result in results])