https://prefect.io logo
#prefect-community
Title
# prefect-community
j

jorwoods

06/11/2020, 1:15 PM
I have another question about checkpointing, results, and skipping task reruns. In my toy example below it runs each task again despite the presence of a LocalResult. Additionally, some of the tasks seem to run twice in a given flow run. Not sure if I have found a bug or if I am doing something wrong. Version I'm using is
0.11.5+134.g5e4898dde
I am running on Win 10 and have verified I have the environment variable
PREFECT__FLOWS__CHECKPOINTING=true
Copy code
from prefect import Flow, task, unmapped, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.executors import LocalDaskExecutor
from prefect.engine.cache_validators import all_parameters

lr = LocalResult(location='{flow_name}-{task_name}-{x}-{y}.pkl',
                 validators=all_parameters
)

@task(log_stdout=True, checkpoint=True)
def add(x, y):
    print(f'add ran with {x} {y}')
    try:
        return sum(x) + y
    except TypeError:
        return x + y

with Flow('iterated map', result=lr) as flow:
    y = unmapped(Parameter('y', default=7))
    x = Parameter('x', default=[1,2,3])
    mapped_result = add.map(x, y=y)
    out = add(mapped_result, y)

flow.run(executor=LocalDaskExecutor())
👀 1
l

Laura Lorenz (she/her)

06/11/2020, 1:51 PM
Hi! Regarding ‘running each task again despite the presence of a LocalResult’, the presence of a result does not prevent execution - but the presence of a
target
does. Basically to opt-in to the behavior that a serialized result means the task should be cached, the task in question also needs
target
to be set becuase that is an add-on behavior of
target
. See this section specifically: https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target Regarding some of the tasks seem to run twice — which tasks are you referring to? There is some intended ‘rerunning’ of tasks in this flow because of the map so if it is about that one, that is the expected behavior of map.
add
is mapped once with 3 inputs so we would expect to see it logs about it runing 4 times (once to establish the map, then once for each input) and then reduced once (so we would expect to see add run again with those parameters). Hope that helps!
j

jorwoods

06/11/2020, 2:11 PM
So to be clear, even though a
location
is specified in the
LocalResult
, I would also have to specify that in the task's
target
as well?
@Laura Lorenz (she/her) this is what I mean when I say a task is running multiple times. You will see two instances of "add ran with x y" for each combination in the following.
Copy code
[2020-06-11 13:20:24] INFO - prefect.FlowRunner | Beginning Flow run for 'iterated map'
[2020-06-11 13:20:24] INFO - prefect.FlowRunner | Starting flow run.
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': finished task run for task with final state: 'Success'
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add': finished task run for task with final state: 'Mapped'
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[2]': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 3 7
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[0]': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[1]': Starting task run...
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 1 7
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 2 7
[2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with [8, 9, 10] 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 3 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': Starting task run...
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 2 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 1 7
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': finished task run for task with final state: 'Success'
[2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': finished task run for task with final stat
In the above logs I posted, I would only expect to see one line each for
add ran with 1 7
add ran with 2 7
add ran with 3 7
, but instead I see two lines for each.
I think I see where some confusion on my end is coming from. In the class definition for Task, there is
Copy code
self.target = target

        if getattr(result, "location", None) and target:
            warnings.warn(
                "Both `result.location` and `target` set on task. Task result will use target as location."
            )
            self.result = result.copy()  # type: ignore
            self.result.location = target
and a Result object has a
check_target
, but it does not have
check_location
. I guess it is unclear when to specify a Result's location vs a Task's target.
l

Laura Lorenz (she/her)

06/11/2020, 2:29 PM
If you want to use the target behavior, you can just include the
target
keyword on the tasks and not specify location on the result — the result object for the task will prefer using the
target
kwarg as the result’s location. Ah yes — you already found that 🙂 We want to support that someone could be using this just to write down files and not use the target mechanism to introduce caching, or to get the persistent caching out of targets, so they are separate. Open to ideas on how to make that clearer in the docs! Gotcha about the duplicates. Can you share more about your dask setup?
Oo I see you have the LocalDaskExecutor
j

jorwoods

06/11/2020, 2:34 PM
dask==2.16.0
Yeah, while I am trying to build and test, I am just using a LocalDaskExecutor. I may move to a centralized one once I get this figured out
l

Laura Lorenz (she/her)

06/11/2020, 2:39 PM
The localdaskexecutor does have some funkiness with maps (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/executors/dask.py#L298-L299). According to my intel it may be even more specific to terminal mapped tasks (which I’m not sure that will be your production case but is in your test flow). Tl;dr for your case it might be worth moving off of LocalDaskExecutor sooner (or starting to test towards your built flow if it won’t have multi level and/or terminal maps), but in the meantime I will reproduce with your example flow with LocalDaskExecutor and see about if this is something we want to fix or raise better warnings about or what
3 Views