jorwoods
06/11/2020, 1:15 PM0.11.5+134.g5e4898dde
I am running on Win 10 and have verified I have the environment variable PREFECT__FLOWS__CHECKPOINTING=true
from prefect import Flow, task, unmapped, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.executors import LocalDaskExecutor
from prefect.engine.cache_validators import all_parameters
lr = LocalResult(location='{flow_name}-{task_name}-{x}-{y}.pkl',
validators=all_parameters
)
@task(log_stdout=True, checkpoint=True)
def add(x, y):
print(f'add ran with {x} {y}')
try:
return sum(x) + y
except TypeError:
return x + y
with Flow('iterated map', result=lr) as flow:
y = unmapped(Parameter('y', default=7))
x = Parameter('x', default=[1,2,3])
mapped_result = add.map(x, y=y)
out = add(mapped_result, y)
flow.run(executor=LocalDaskExecutor())
Laura Lorenz (she/her)
06/11/2020, 1:51 PMtarget
does. Basically to opt-in to the behavior that a serialized result means the task should be cached, the task in question also needs target
to be set becuase that is an add-on behavior of target
. See this section specifically: https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target
Regarding some of the tasks seem to run twice — which tasks are you referring to? There is some intended ‘rerunning’ of tasks in this flow because of the map so if it is about that one, that is the expected behavior of map. add
is mapped once with 3 inputs so we would expect to see it logs about it runing 4 times (once to establish the map, then once for each input) and then reduced once (so we would expect to see add run again with those parameters).
Hope that helps!jorwoods
06/11/2020, 2:11 PMlocation
is specified in the LocalResult
, I would also have to specify that in the task's target
as well?[2020-06-11 13:20:24] INFO - prefect.FlowRunner | Beginning Flow run for 'iterated map'
[2020-06-11 13:20:24] INFO - prefect.FlowRunner | Starting flow run.
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': finished task run for task with final state: 'Success'
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add': finished task run for task with final state: 'Mapped'
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[2]': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 3 7
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[0]': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[1]': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 1 7
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 2 7
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with [8, 9, 10] 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 3 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 2 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 1 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': finished task run for task with final stat
add ran with 1 7
add ran with 2 7
add ran with 3 7
, but instead I see two lines for each.self.target = target
if getattr(result, "location", None) and target:
warnings.warn(
"Both `result.location` and `target` set on task. Task result will use target as location."
)
self.result = result.copy() # type: ignore
self.result.location = target
and a Result object has a check_target
, but it does not have check_location
. I guess it is unclear when to specify a Result's location vs a Task's target.Laura Lorenz (she/her)
06/11/2020, 2:29 PMtarget
keyword on the tasks and not specify location on the result — the result object for the task will prefer using the target
kwarg as the result’s location. Ah yes — you already found that 🙂 We want to support that someone could be using this just to write down files and not use the target mechanism to introduce caching, or to get the persistent caching out of targets, so they are separate. Open to ideas on how to make that clearer in the docs!
Gotcha about the duplicates. Can you share more about your dask setup?jorwoods
06/11/2020, 2:34 PMLaura Lorenz (she/her)
06/11/2020, 2:39 PM