Pooyan
11/11/2023, 9:04 PMmy_task.map(list_of_nums, static_num)
so my task adds static_num to numbers in the list. However the result I get from running the flow us unpersisted. Whats the best approach to get the final list result there?Marvin
11/11/2023, 9:04 PM.map()
function is used to automatically create a task run for each element of its input data. This means that if you have a task that needs to be applied to each element of a list, you can use .map()
to do this.
However, if you want to persist the results of these tasks, you'll need to use a custom result storage and serializer. You can retrieve a result from a custom storage by passing the storage block ID that was used to store the result, the serializer type, and the storage key to prefect.results.PersistedResult
and calling its get
method.
Here's an example of how you might do this:
from prefect import flow, task
from prefect.filesystems import GCS
from prefect.results import PersistedResult
@task
def my_task(num, static_num):
return num + static_num
@flow
def my_flow():
list_of_nums = [1, 2, 3, 4, 5]
static_num = 10
result = my_task.map(list_of_nums, static_num)
persisted_result = PersistedResult(result, serializer_type='json', storage_key='my_key')
persisted_result.get()
my_flow.run()
In this example, my_task
is mapped over list_of_nums
and static_num
, and the result is stored using a PersistedResult
. The get
method is then called on the PersistedResult
to retrieve the result from storage.
Please note that this is a simplified example and you'll need to replace the serializer_type
and storage_key
with your own values. Also, you'll need to set up your own result storage (in this example, Google Cloud Storage is used).
For more detailed information, you might want to check out the Prefect documentation and this example on GitHub.Nate
11/11/2023, 9:08 PMPooyan
11/11/2023, 9:09 PMNate
11/11/2023, 9:10 PMPooyan
11/11/2023, 9:11 PMPooyan
11/11/2023, 9:11 PMPooyan
11/11/2023, 9:13 PMNate
11/11/2023, 9:16 PMIn [1]: from prefect import flow, task, unmapped
In [2]: @task
...: def add_together(x, y):
...: return x + y
...: @flow
...: def sum_it(numbers, static_value):
...: futures = add_together.map(numbers, unmapped(static_value))
...: return futures
...:
In [3]: futures = sum_it([1, 2, 3], 5)
15:15:29.515 | INFO | prefect.engine - Created flow run 'satisfied-baboon' for flow 'sum-it'
15:15:30.355 | INFO | Flow run 'satisfied-baboon' - Created task run 'add_together-2' for task 'add_together'
15:15:30.356 | INFO | Flow run 'satisfied-baboon' - Submitted task run 'add_together-2' for execution.
15:15:30.368 | INFO | Flow run 'satisfied-baboon' - Created task run 'add_together-1' for task 'add_together'
15:15:30.369 | INFO | Flow run 'satisfied-baboon' - Submitted task run 'add_together-1' for execution.
15:15:30.373 | INFO | Flow run 'satisfied-baboon' - Created task run 'add_together-0' for task 'add_together'
15:15:30.373 | INFO | Flow run 'satisfied-baboon' - Submitted task run 'add_together-0' for execution.
15:15:30.753 | INFO | Task run 'add_together-1' - Finished in state Completed()
15:15:30.780 | INFO | Task run 'add_together-0' - Finished in state Completed()
15:15:30.837 | INFO | Task run 'add_together-2' - Finished in state Completed()
15:15:30.957 | INFO | Flow run 'satisfied-baboon' - Finished in state Completed('All states completed.')
In [4]: [fut.result() for fut in futures]
Out[4]: [6, 7, 8]
Nate
11/11/2023, 9:17 PMstatic_value
to each mapped task, so i used unmapped
to be explicit, but you shouldnt have to since its not an iterable type anywaysNate
11/11/2023, 9:18 PMPooyan
11/11/2023, 9:41 PM[UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'),
UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'),
UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')]
Pooyan
11/11/2023, 9:47 PMNate
11/11/2023, 9:56 PMNate
11/11/2023, 10:01 PM@flow(persist_results=True)
and then fetch the result from your storage location via the API like marvin's original suggestion. otherwise, when you exit the flow run context, the result is gone because it wasnt persistedPooyan
11/11/2023, 10:39 PM