Bill
08/18/2023, 3:45 PMmap
operation when using task_runner=DaskTaskRunner()
Marvin
08/18/2023, 3:46 PMmap
operation when using DaskTaskRunner()
can be a bit tricky. The DaskTaskRunner
is a parallel task runner, which means it can execute multiple tasks concurrently. However, this can cause issues when you want to persist results, as tasks may not have finished executing when you try to access their results.
In Prefect, calling .result()
on a task will block the flow until that task has finished executing. This means that if you try to access the result of a task inside a map
operation before it has finished, your flow will block and wait for the task to complete. This can limit Prefect's ability to track task dependencies and can slow down your flow.
One way to work around this is to use Prefect's State
objects to store the results of your tasks. You can then access these results later on in your flow without blocking. Here's an example:
python
from prefect import flow, task, State
@task
def my_task(x):
return x * 2
@task
def persist_results(state, x):
state.update({x: my_task(x)})
return state
@flow
def my_flow():
state = State()
for x in range(10):
state = persist_results(state, x)
return state
state = my_flow.run()
print(state.result())
In this example, the persist_results
task takes in the current state and a value x
, runs my_task
on x
, and stores the result in the state. The state is then passed along to the next iteration of the loop, allowing you to accumulate results without blocking.
Remember, though, that this is a workaround and may not be the best solution for all use cases. If you're running into issues with persisting results in Prefect, you might want to consider reaching out to the Prefect community for more specific advice.Nate
08/18/2023, 3:47 PMpersist_results=True
Bill
08/18/2023, 3:50 PMmap
operation in a function call? I'm currently seeing
result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'))
Marvin
08/18/2023, 3:50 PMmap
operation in Prefect, but you're getting an UnpersistedResult
instead. This is because, by default, Prefect does not persist the results of tasks. An UnpersistedResult
is a type of result that Prefect uses to represent results that have not been and will not be persisted beyond the current flow run.
If you want to access the results of your tasks, you'll need to use the get()
method on the result object. This will return the value of the result. Here's an example:
python
from prefect import Flow, task
@task
def double(x):
return x * 2
with Flow("My Flow") as flow:
result = double.map([1, 2, 3])
state = flow.run()
# Access the results
results = [res.get() for res in state.result[result].result]
print(results) # prints: [2, 4, 6]
In this example, the double
task is mapped over the list [1, 2, 3]
, and the results are accessed using the get()
method.
However, if you want to persist the results of your tasks for later use, you'll need to use a Result
type that supports persistence, such as LocalResult
or S3Result
. You can specify the Result
type to use by setting the result
parameter on your task:
python
from prefect import Flow, task
from prefect.engine.results import LocalResult
@task(result=LocalResult())
def double(x):
return x * 2
with Flow("My Flow") as flow:
result = double.map([1, 2, 3])
state = flow.run()
# Access the results
results = [res.get() for res in state.result[result].result]
print(results) # prints: [2, 4, 6]
In this example, the double
task uses a LocalResult
to persist its results. The results are then accessed in the same way as before.Nate
08/18/2023, 3:51 PMfrom prefect import flow, task
from prefect.futures import PrefectFuture
@task
def process_item(item):
print(item)
return item
@flow
def parent(items: list):
processed_items = process_item.map(items)
assert all(isinstance(item, PrefectFuture) for item in processed_items)
return [item.result() for item in processed_items]
if __name__ == "__main__":
results = parent(items=["foo", 42, None])
assert results == ["foo", 42, None]
map
calls Task.submit()
for each item, which means the caller gets a future they have to resolve with result()