Pooyan
11/11/2023, 9:04 PMmy_task.map(list_of_nums, static_num) so my task adds static_num to numbers in the list. However the result I get from running the flow us unpersisted. Whats the best approach to get the final list result there?Marvin
11/11/2023, 9:04 PM.map() function is used to automatically create a task run for each element of its input data. This means that if you have a task that needs to be applied to each element of a list, you can use .map() to do this.
However, if you want to persist the results of these tasks, you'll need to use a custom result storage and serializer. You can retrieve a result from a custom storage by passing the storage block ID that was used to store the result, the serializer type, and the storage key to prefect.results.PersistedResult and calling its get method.
Here's an example of how you might do this:
from prefect import flow, task
from prefect.filesystems import GCS
from prefect.results import PersistedResult
@task
def my_task(num, static_num):
return num + static_num
@flow
def my_flow():
list_of_nums = [1, 2, 3, 4, 5]
static_num = 10
result = my_task.map(list_of_nums, static_num)
persisted_result = PersistedResult(result, serializer_type='json', storage_key='my_key')
persisted_result.get()
my_flow.run()
In this example, my_task is mapped over list_of_nums and static_num, and the result is stored using a PersistedResult. The get method is then called on the PersistedResult to retrieve the result from storage.
Please note that this is a simplified example and you'll need to replace the serializer_type and storage_key with your own values. Also, you'll need to set up your own result storage (in this example, Google Cloud Storage is used).
For more detailed information, you might want to check out the Prefect documentation and this example on GitHub.Nate
11/11/2023, 9:08 PMPooyan
11/11/2023, 9:09 PMNate
11/11/2023, 9:10 PMPooyan
11/11/2023, 9:11 PMPooyan
11/11/2023, 9:11 PMPooyan
11/11/2023, 9:13 PMNate
11/11/2023, 9:16 PMIn [1]: from prefect import flow, task, unmapped
In [2]: @task
...: def add_together(x, y):
...: return x + y
...: @flow
...: def sum_it(numbers, static_value):
...: futures = add_together.map(numbers, unmapped(static_value))
...: return futures
...:
In [3]: futures = sum_it([1, 2, 3], 5)
15:15:29.515 | INFO | prefect.engine - Created flow run 'satisfied-baboon' for flow 'sum-it'
15:15:30.355 | INFO | Flow run 'satisfied-baboon' - Created task run 'add_together-2' for task 'add_together'
15:15:30.356 | INFO | Flow run 'satisfied-baboon' - Submitted task run 'add_together-2' for execution.
15:15:30.368 | INFO | Flow run 'satisfied-baboon' - Created task run 'add_together-1' for task 'add_together'
15:15:30.369 | INFO | Flow run 'satisfied-baboon' - Submitted task run 'add_together-1' for execution.
15:15:30.373 | INFO | Flow run 'satisfied-baboon' - Created task run 'add_together-0' for task 'add_together'
15:15:30.373 | INFO | Flow run 'satisfied-baboon' - Submitted task run 'add_together-0' for execution.
15:15:30.753 | INFO | Task run 'add_together-1' - Finished in state Completed()
15:15:30.780 | INFO | Task run 'add_together-0' - Finished in state Completed()
15:15:30.837 | INFO | Task run 'add_together-2' - Finished in state Completed()
15:15:30.957 | INFO | Flow run 'satisfied-baboon' - Finished in state Completed('All states completed.')
In [4]: [fut.result() for fut in futures]
Out[4]: [6, 7, 8]Nate
11/11/2023, 9:17 PMstatic_value to each mapped task, so i used unmapped to be explicit, but you shouldnt have to since its not an iterable type anywaysNate
11/11/2023, 9:18 PMPooyan
11/11/2023, 9:41 PM[UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'),
UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'),
UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')]Pooyan
11/11/2023, 9:47 PMNate
11/11/2023, 9:56 PMNate
11/11/2023, 10:01 PM@flow(persist_results=True) and then fetch the result from your storage location via the API like marvin's original suggestion. otherwise, when you exit the flow run context, the result is gone because it wasnt persistedPooyan
11/11/2023, 10:39 PM