Thread
#prefect-community
    Alex Goodman

    Alex Goodman

    2 years ago
    Indeed, @Andrew Schechtman-Rook! The
    LocalResultHandler
    does store information without regard to the task in question, however, the state object as a result of each task run does keep track of the path written to. You can then persist the task-to-file mapping separately any way you’d like. You could persist all of the state returned by
    flow.run()
    , or persist as state is available with a state_handler. An example of caching this for the whole flow using the
    LocalResultHandler
    + some state manipulation:
    with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
        x = e()
        y = y(x)
        z(y)
    
        flow_state = flow.run()
    
      for task, task_state in state.result.items():
        task_name = slugify.slugify(task.name)
        filepath = task_state._result.safe_value.value
        with open(os.path.join(".prefect", task_name), "wb") as fh:
          fh.write(filepath)
    In this way in the
    .prefect
    directory you would have the a result file for each task as well as a “tracking” file for each task, named by the task name:
    tree .prefect
    .prefect
    ├── prefect-2g3sboyi
    ├── prefect-ixeqv82p
    ├── prefect-km5ufh9y
    ├── x
    ├── y
    └── z
    Note, I used slugify to encode the task name, but you have several options here. A good option if you have off task names is to set the
    task.slug
    and use this to key off of. If you wanted to do the same thing but in a task state_handler:
    def handler(task, old, new):
      if new.is_successful():
        task_name = slugify.slugify(task.name)
        filepath = new._result.safe_value.value
        with open(os.path.join(".prefect", task_name), "w") as fh:
          fh.write(filepath)
    a

    Andrew Schechtman-Rook

    2 years ago
    Thanks @Alex Goodman! I'm trying to run your example and I'm getting a
    TypeError: a bytes-like object is required, not 'NoResultType'
    on the
    write
    command. This is what I have (I had to make some slight modifications to deal with a couple of other errors:
    import prefect
    from prefect.engine.result_handlers import LocalResultHandler
    
    @prefect.task
    def e():
        return 5
    
    @prefect.task
    def y (input_val):
        return input_val * 3
    
    with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
        x = e()
        y = y(x)
    flow_state = flow.run()
    
    for task, task_state in flow_state.result.items():
        task_name = slugify.slugify(task.name)
        filepath = task_state._result.safe_value.value
        with open(os.path.join(".prefect", task_name), "wb") as fh:
            fh.write(filepath)
    Alex Goodman

    Alex Goodman

    2 years ago
    ahh, my mistake... change
    "wb"
    in the
    open
    clause to
    "w"
    a

    Andrew Schechtman-Rook

    2 years ago
    TypeError: write() argument must be str, not NoResultType
    Alex Goodman

    Alex Goodman

    2 years ago
    gotcha, in my example all tasks returned results, where in your flow there are some tasks with no returned results. Before writing anything we'll have to take account for this
    a

    Andrew Schechtman-Rook

    2 years ago
    which of my tasks don't return a result? both of my tasks have a
    return
    Alex Goodman

    Alex Goodman

    2 years ago
    let me run it locally to see what's up
    a

    Andrew Schechtman-Rook

    2 years ago
    thanks
    Alex Goodman

    Alex Goodman

    2 years ago
    Interesting, I wasn't able to reproduce:
    ~/code/ in-script-cli-poc* 
    venv374 ❯ python test.py 
    [2019-12-23 21:49:46,282] INFO - prefect.FlowRunner | Beginning Flow run for 'example'
    ...<snip>...
    [2019-12-23 21:49:46,371] DEBUG - prefect.FlowRunner | Flow 'example': Handling state change from Running to Success
                                                                                                                                                                                                                                    
    ~/code/ in-script-cli-poc* 
    venv374 ❯ ls -al .prefect 
    total 32
    drwxr-xr-x   6 user  user  192 Dec 23 16:49 .
    drwxr-xr-x  16 user  user  512 Dec 23 16:48 ..
    -rw-r--r--   1 user  user   56 Dec 23 16:49 e
    -rw-------   1 user  user    5 Dec 23 16:49 prefect-qcf3c39e
    -rw-------   1 user  user    5 Dec 23 16:49 prefect-wd24pmtb
    -rw-r--r--   1 user  user   56 Dec 23 16:49 y
                                                                                                                                                                                                                                    
    ~/code/ in-script-cli-poc* 
    venv374 ❯
    The only thing I changed was the
    wb
    i mentioned, and a few imported (
    os
    and
    slugify
    )
    a

    Andrew Schechtman-Rook

    2 years ago
    so that's interesting
    oh, looks like I'm on an older version of prefect, maybe that's my problem
    Alex Goodman

    Alex Goodman

    2 years ago
    which version? I can try to reproduce it
    a

    Andrew Schechtman-Rook

    2 years ago
    0.7.3
    I'm trying to pull the latest version from
    conda-forge
    now
    Alex Goodman

    Alex Goodman

    2 years ago
    yup, that's the problem, I can update the example
    a

    Andrew Schechtman-Rook

    2 years ago
    don't worry about it, I'd rather update 🙂
    Alex Goodman

    Alex Goodman

    2 years ago
    all good, updating it usually best, however if you find yourself unable to update a hold over would be:
    for task, task_state in flow_state.result.items():
        task_name = slugify.slugify(task.name)
        task_state._result.store_safe_value()
        filepath = task_state._result.safe_value.value
        with open(os.path.join(".prefect", task_name), "w") as fh:
            fh.write(filepath)
    a

    Andrew Schechtman-Rook

    2 years ago
    would you be able to paste in your working version of the test script? even after updating mine's still not working, and I'm starting to strongly suspect the problem is between my ears
    Alex Goodman

    Alex Goodman

    2 years ago
    nope, looks it was my bad... I may have been depending on features from master, this will do you the trick:
    import os
    import prefect
    from prefect.engine.result_handlers import LocalResultHandler
    
    import slugify
    
    @prefect.task
    def e():
        return 5
    @prefect.task
    def y(input_val):
        return input_val * 3
    with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
        x = e()
        y = y(x)
    flow_state = flow.run()
    
    for task, task_state in flow_state.result.items():
        task_name = slugify.slugify(task.name)
        task_state._result.store_safe_value()
        filepath = task_state._result.safe_value.value
        with open(os.path.join(".prefect", task_name), "w") as fh:
            fh.write(filepath)
    a

    Andrew Schechtman-Rook

    2 years ago
    yep, that's working now
    thanks! this at least gives me something to start playing around with
    In case anyone in the future is looking for this functionality, here's what I got working:
    import datetime
    import os
    import prefect
    from prefect.engine.result_handlers import LocalResultHandler
    
    @prefect.task
    def generate_data():
        return 5
    
    @prefect.task(
        cache_for=datetime.timedelta(minutes=1)
    )
    def manipulate_data_1(input_data):
        return input_data * 3
    
    @prefect.task
    def manipulate_data_2(input_data):
        print(input_data ** 2)
        return input_data ** 2
    
    def cache_to_result_handler(task, old_state, new_state):
        if old_state.is_successful() and new_state.is_cached():
            old_state._result.store_safe_value()
        if old_state.is_pending() and new_state.is_cached():
            new_state._result = new_state._result.safe_value.to_result()
        return new_state
    
    
    with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
        data_1 = generate_data()
        data_2 = manipulate_data_1(data_1)
        data_3 = manipulate_data_2(data_2)
    
    flow_state = flow.run(
            task_runner_state_handlers=[cache_to_result_handler]
    )
    for task_name, caches in prefect.context.caches.items():
        for cache in caches:
            cache._result.value = None
    
    flow_state_2 = flow.run(
            task_runner_state_handlers=[cache_to_result_handler]
    )
    First, there's a state handler that does 2 things: for state changes from successful to cached, it invokes the
    safe_result
    method to trigger the result_handler's
    write
    method; for state changes from pending to cached, it inflates the safe result by invoking the
    to_result
    method, and uses that to replace the cached result. Second, at the end of the first flow, a quick loop replaces all cached result values with
    None
    so that the results won't take up any space if persisted to disk. It seems to get the job done, although I'm not super psyched about it because manually editing the cache post-flow seems really kludgey.