<@ULVA73B9P> I am trying to apply `map`on a list o...
# ask-marvin
p
@Marvin I am trying to apply `map`on a list of numbers in a flow. That is to apply
my_task.map(list_of_nums, static_num)
so my task adds static_num to numbers in the list. However the result I get from running the flow us unpersisted. Whats the best approach to get the final list result there?
m
In Prefect, the
.map()
function is used to automatically create a task run for each element of its input data. This means that if you have a task that needs to be applied to each element of a list, you can use
.map()
to do this. However, if you want to persist the results of these tasks, you'll need to use a custom result storage and serializer. You can retrieve a result from a custom storage by passing the storage block ID that was used to store the result, the serializer type, and the storage key to
prefect.results.PersistedResult
and calling its
get
method. Here's an example of how you might do this:
Copy code
from prefect import flow, task
from prefect.filesystems import GCS
from prefect.results import PersistedResult

@task
def my_task(num, static_num):
    return num + static_num

@flow
def my_flow():
    list_of_nums = [1, 2, 3, 4, 5]
    static_num = 10
    result = my_task.map(list_of_nums, static_num)
    persisted_result = PersistedResult(result, serializer_type='json', storage_key='my_key')
    persisted_result.get()

my_flow.run()
In this example,
my_task
is mapped over
list_of_nums
and
static_num
, and the result is stored using a
PersistedResult
. The
get
method is then called on the
PersistedResult
to retrieve the result from storage. Please note that this is a simplified example and you'll need to replace the
serializer_type
and
storage_key
with your own values. Also, you'll need to set up your own result storage (in this example, Google Cloud Storage is used). For more detailed information, you might want to check out the Prefect documentation and this example on GitHub.
n
you shouldn’t need to worry about using PersistedResult.get() unless you’re using run_deployment or something, the result should be in memory. are you calling .result() on each item you get back from .map()?
p
Yes when I run .result() I get an UnpersistedResult object
n
could you share your code?
p
@task def add_together(x, y): return x + y @flow def sum_it(numbers, static_value): futures = add_together.map(numbers, static_value) return futures
results = sum_it([1, 2, 3], 5)
n
Copy code
In [1]: from prefect import flow, task, unmapped

In [2]: @task
   ...: def add_together(x, y):
   ...:     return x + y
   ...: @flow
   ...: def sum_it(numbers, static_value):
   ...:     futures = add_together.map(numbers, unmapped(static_value))
   ...:     return futures
   ...:

In [3]: futures = sum_it([1, 2, 3], 5)
15:15:29.515 | INFO    | prefect.engine - Created flow run 'satisfied-baboon' for flow 'sum-it'
15:15:30.355 | INFO    | Flow run 'satisfied-baboon' - Created task run 'add_together-2' for task 'add_together'
15:15:30.356 | INFO    | Flow run 'satisfied-baboon' - Submitted task run 'add_together-2' for execution.
15:15:30.368 | INFO    | Flow run 'satisfied-baboon' - Created task run 'add_together-1' for task 'add_together'
15:15:30.369 | INFO    | Flow run 'satisfied-baboon' - Submitted task run 'add_together-1' for execution.
15:15:30.373 | INFO    | Flow run 'satisfied-baboon' - Created task run 'add_together-0' for task 'add_together'
15:15:30.373 | INFO    | Flow run 'satisfied-baboon' - Submitted task run 'add_together-0' for execution.
15:15:30.753 | INFO    | Task run 'add_together-1' - Finished in state Completed()
15:15:30.780 | INFO    | Task run 'add_together-0' - Finished in state Completed()
15:15:30.837 | INFO    | Task run 'add_together-2' - Finished in state Completed()
15:15:30.957 | INFO    | Flow run 'satisfied-baboon' - Finished in state Completed('All states completed.')

In [4]: [fut.result() for fut in futures]
Out[4]: [6, 7, 8]
i assume you wanted to send
static_value
to each mapped task, so i used
unmapped
to be explicit, but you shouldnt have to since its not an iterable type anyways
you're also free to call .result() on the futures within the flow so that the flow just returns the literal results
p
Thanks. Should this run inside notebook too? I believe that might be the reason since I am getting the following even though I am duplicating the above code
Copy code
[UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'),
 UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'),
 UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')]
It worked outside notebook when I ran the python code and printed results.
n
hmm, whats different between your code and this?
ahh so if you didnt resolve the futures within the flow run context (like my example above does) and you return the futures out of the flow run context, you would need to persist the results
@flow(persist_results=True)
and then fetch the result from your storage location via the API like marvin's original suggestion. otherwise, when you exit the flow run context, the result is gone because it wasnt persisted
1
p
awsome. Thanks a lot. yeah my problem was not resolving in the flow context