Thread
#prefect-community
    j

    jorwoods

    2 years ago
    I have another question about checkpointing, results, and skipping task reruns. In my toy example below it runs each task again despite the presence of a LocalResult. Additionally, some of the tasks seem to run twice in a given flow run. Not sure if I have found a bug or if I am doing something wrong. Version I'm using is
    0.11.5+134.g5e4898dde
    I am running on Win 10 and have verified I have the environment variable
    PREFECT__FLOWS__CHECKPOINTING=true
    from prefect import Flow, task, unmapped, Parameter
    from prefect.engine.results import LocalResult
    from prefect.engine.executors import LocalDaskExecutor
    from prefect.engine.cache_validators import all_parameters
    
    lr = LocalResult(location='{flow_name}-{task_name}-{x}-{y}.pkl',
                     validators=all_parameters
    )
    
    @task(log_stdout=True, checkpoint=True)
    def add(x, y):
        print(f'add ran with {x} {y}')
        try:
            return sum(x) + y
        except TypeError:
            return x + y
    
    with Flow('iterated map', result=lr) as flow:
        y = unmapped(Parameter('y', default=7))
        x = Parameter('x', default=[1,2,3])
        mapped_result = add.map(x, y=y)
        out = add(mapped_result, y)
    
    flow.run(executor=LocalDaskExecutor())
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    Hi! Regarding ‘running each task again despite the presence of a LocalResult’, the presence of a result does not prevent execution - but the presence of a
    target
    does. Basically to opt-in to the behavior that a serialized result means the task should be cached, the task in question also needs
    target
    to be set becuase that is an add-on behavior of
    target
    . See this section specifically: https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target Regarding some of the tasks seem to run twice — which tasks are you referring to? There is some intended ‘rerunning’ of tasks in this flow because of the map so if it is about that one, that is the expected behavior of map.
    add
    is mapped once with 3 inputs so we would expect to see it logs about it runing 4 times (once to establish the map, then once for each input) and then reduced once (so we would expect to see add run again with those parameters). Hope that helps!
    j

    jorwoods

    2 years ago
    So to be clear, even though a
    location
    is specified in the
    LocalResult
    , I would also have to specify that in the task's
    target
    as well?
    @Laura Lorenz (she/her) this is what I mean when I say a task is running multiple times. You will see two instances of "add ran with x y" for each combination in the following.
    [2020-06-11 13:20:24] INFO - prefect.FlowRunner | Beginning Flow run for 'iterated map'
    [2020-06-11 13:20:24] INFO - prefect.FlowRunner | Starting flow run.
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': Starting task run...
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'x': Starting task run...
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add': Starting task run...
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add': finished task run for task with final state: 'Mapped'
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[2]': Starting task run...
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 3 7
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': Starting task run...
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[0]': Starting task run...
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'add[1]': Starting task run...
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 1 7
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | add ran with 2 7
    [2020-06-11 13:20:25] INFO - prefect.TaskRunner | Task 'y': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'x': Starting task run...
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add': Starting task run...
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with [8, 9, 10] 7
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': Starting task run...
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': Starting task run...
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 3 7
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': Starting task run...
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 2 7
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | add ran with 1 7
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[2]': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[0]': finished task run for task with final state: 'Success'
    [2020-06-11 13:20:26] INFO - prefect.TaskRunner | Task 'add[1]': finished task run for task with final stat
    In the above logs I posted, I would only expect to see one line each for
    add ran with 1 7
    add ran with 2 7
    add ran with 3 7
    , but instead I see two lines for each.
    I think I see where some confusion on my end is coming from. In the class definition for Task, there is
    self.target = target
    
            if getattr(result, "location", None) and target:
                warnings.warn(
                    "Both `result.location` and `target` set on task. Task result will use target as location."
                )
                self.result = result.copy()  # type: ignore
                self.result.location = target
    and a Result object has a
    check_target
    , but it does not have
    check_location
    . I guess it is unclear when to specify a Result's location vs a Task's target.
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    If you want to use the target behavior, you can just include the
    target
    keyword on the tasks and not specify location on the result — the result object for the task will prefer using the
    target
    kwarg as the result’s location. Ah yes — you already found that 🙂 We want to support that someone could be using this just to write down files and not use the target mechanism to introduce caching, or to get the persistent caching out of targets, so they are separate. Open to ideas on how to make that clearer in the docs! Gotcha about the duplicates. Can you share more about your dask setup?
    Oo I see you have the LocalDaskExecutor
    j

    jorwoods

    2 years ago
    dask==2.16.0
    Yeah, while I am trying to build and test, I am just using a LocalDaskExecutor. I may move to a centralized one once I get this figured out
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    The localdaskexecutor does have some funkiness with maps (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/executors/dask.py#L298-L299). According to my intel it may be even more specific to terminal mapped tasks (which I’m not sure that will be your production case but is in your test flow). Tl;dr for your case it might be worth moving off of LocalDaskExecutor sooner (or starting to test towards your built flow if it won’t have multi level and/or terminal maps), but in the meantime I will reproduce with your example flow with LocalDaskExecutor and see about if this is something we want to fix or raise better warnings about or what