https://prefect.io logo
b

Bill

08/18/2023, 3:45 PM
@Marvin How do I persist results from a
map
operation when using
task_runner=DaskTaskRunner()
m

Marvin

08/18/2023, 3:46 PM
Persisting results from a
map
operation when using
DaskTaskRunner()
can be a bit tricky. The
DaskTaskRunner
is a parallel task runner, which means it can execute multiple tasks concurrently. However, this can cause issues when you want to persist results, as tasks may not have finished executing when you try to access their results. In Prefect, calling
.result()
on a task will block the flow until that task has finished executing. This means that if you try to access the result of a task inside a
map
operation before it has finished, your flow will block and wait for the task to complete. This can limit Prefect's ability to track task dependencies and can slow down your flow. One way to work around this is to use Prefect's
State
objects to store the results of your tasks. You can then access these results later on in your flow without blocking. Here's an example:
Copy code
python
from prefect import flow, task, State

@task
def my_task(x):
    return x * 2

@task
def persist_results(state, x):
    state.update({x: my_task(x)})
    return state

@flow
def my_flow():
    state = State()
    for x in range(10):
        state = persist_results(state, x)
    return state

state = my_flow.run()
print(state.result())
In this example, the
persist_results
task takes in the current state and a value
x
, runs
my_task
on
x
, and stores the result in the state. The state is then passed along to the next iteration of the loop, allowing you to accumulate results without blocking. Remember, though, that this is a workaround and may not be the best solution for all use cases. If you're running into issues with persisting results in Prefect, you might want to consider reaching out to the Prefect community for more specific advice.
n

Nate

08/18/2023, 3:47 PM
🙅 persisting results shouldnt depend on which task runner you use - it should just be a matter of using
persist_results=True
b

Bill

08/18/2023, 3:50 PM
@Marvin How do I return the results from a
map
operation in a function call? I'm currently seeing
Copy code
result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'))
m

Marvin

08/18/2023, 3:50 PM
It seems like you're trying to access the results of a
map
operation in Prefect, but you're getting an
UnpersistedResult
instead. This is because, by default, Prefect does not persist the results of tasks. An
UnpersistedResult
is a type of result that Prefect uses to represent results that have not been and will not be persisted beyond the current flow run. If you want to access the results of your tasks, you'll need to use the
get()
method on the result object. This will return the value of the result. Here's an example:
Copy code
python
from prefect import Flow, task

@task
def double(x):
    return x * 2

with Flow("My Flow") as flow:
    result = double.map([1, 2, 3])

state = flow.run()

# Access the results
results = [res.get() for res in state.result[result].result]
print(results)  # prints: [2, 4, 6]
In this example, the
double
task is mapped over the list
[1, 2, 3]
, and the results are accessed using the
get()
method. However, if you want to persist the results of your tasks for later use, you'll need to use a
Result
type that supports persistence, such as
LocalResult
or
S3Result
. You can specify the
Result
type to use by setting the
result
parameter on your task:
Copy code
python
from prefect import Flow, task
from prefect.engine.results import LocalResult

@task(result=LocalResult())
def double(x):
    return x * 2

with Flow("My Flow") as flow:
    result = double.map([1, 2, 3])

state = flow.run()

# Access the results
results = [res.get() for res in state.result[result].result]
print(results)  # prints: [2, 4, 6]
In this example, the
double
task uses a
LocalResult
to persist its results. The results are then accessed in the same way as before.
n

Nate

08/18/2023, 3:51 PM
Copy code
from prefect import flow, task
from prefect.futures import PrefectFuture

@task
def process_item(item):
    print(item)
    return item

@flow
def parent(items: list):
    processed_items = process_item.map(items)
    
    assert all(isinstance(item, PrefectFuture) for item in processed_items)
        
    return [item.result() for item in processed_items]
    
if __name__ == "__main__":
    results = parent(items=["foo", 42, None])
    assert results == ["foo", 42, None]
map
calls
Task.submit()
for each item, which means the caller gets a future they have to resolve with
result()
👍 1