Bill
08/18/2023, 3:45 PMmap operation when using task_runner=DaskTaskRunner()Marvin
08/18/2023, 3:46 PMmap operation when using DaskTaskRunner() can be a bit tricky. The DaskTaskRunner is a parallel task runner, which means it can execute multiple tasks concurrently. However, this can cause issues when you want to persist results, as tasks may not have finished executing when you try to access their results.
In Prefect, calling .result() on a task will block the flow until that task has finished executing. This means that if you try to access the result of a task inside a map operation before it has finished, your flow will block and wait for the task to complete. This can limit Prefect's ability to track task dependencies and can slow down your flow.
One way to work around this is to use Prefect's State objects to store the results of your tasks. You can then access these results later on in your flow without blocking. Here's an example:
python
from prefect import flow, task, State
@task
def my_task(x):
return x * 2
@task
def persist_results(state, x):
state.update({x: my_task(x)})
return state
@flow
def my_flow():
state = State()
for x in range(10):
state = persist_results(state, x)
return state
state = my_flow.run()
print(state.result())
In this example, the persist_results task takes in the current state and a value x, runs my_task on x, and stores the result in the state. The state is then passed along to the next iteration of the loop, allowing you to accumulate results without blocking.
Remember, though, that this is a workaround and may not be the best solution for all use cases. If you're running into issues with persisting results in Prefect, you might want to consider reaching out to the Prefect community for more specific advice.Nate
08/18/2023, 3:47 PMpersist_results=TrueBill
08/18/2023, 3:50 PMmap operation in a function call? I'm currently seeing
result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`'))Marvin
08/18/2023, 3:50 PMmap operation in Prefect, but you're getting an UnpersistedResult instead. This is because, by default, Prefect does not persist the results of tasks. An UnpersistedResult is a type of result that Prefect uses to represent results that have not been and will not be persisted beyond the current flow run.
If you want to access the results of your tasks, you'll need to use the get() method on the result object. This will return the value of the result. Here's an example:
python
from prefect import Flow, task
@task
def double(x):
return x * 2
with Flow("My Flow") as flow:
result = double.map([1, 2, 3])
state = flow.run()
# Access the results
results = [res.get() for res in state.result[result].result]
print(results) # prints: [2, 4, 6]
In this example, the double task is mapped over the list [1, 2, 3], and the results are accessed using the get() method.
However, if you want to persist the results of your tasks for later use, you'll need to use a Result type that supports persistence, such as LocalResult or S3Result. You can specify the Result type to use by setting the result parameter on your task:
python
from prefect import Flow, task
from prefect.engine.results import LocalResult
@task(result=LocalResult())
def double(x):
return x * 2
with Flow("My Flow") as flow:
result = double.map([1, 2, 3])
state = flow.run()
# Access the results
results = [res.get() for res in state.result[result].result]
print(results) # prints: [2, 4, 6]
In this example, the double task uses a LocalResult to persist its results. The results are then accessed in the same way as before.Nate
08/18/2023, 3:51 PMfrom prefect import flow, task
from prefect.futures import PrefectFuture
@task
def process_item(item):
print(item)
return item
@flow
def parent(items: list):
processed_items = process_item.map(items)
assert all(isinstance(item, PrefectFuture) for item in processed_items)
return [item.result() for item in processed_items]
if __name__ == "__main__":
results = parent(items=["foo", 42, None])
assert results == ["foo", 42, None]Nate
08/18/2023, 3:52 PMmap calls Task.submit() for each item, which means the caller gets a future they have to resolve with result()