Indeed, <@URZML9A05>! The `LocalResultHandler` doe...
# prefect-community
a
Indeed, @Andrew Schechtman-Rook! The
LocalResultHandler
does store information without regard to the task in question, however, the state object as a result of each task run does keep track of the path written to. You can then persist the task-to-file mapping separately any way you’d like. You could persist all of the state returned by
flow.run()
, or persist as state is available with a state_handler. An example of caching this for the whole flow using the
LocalResultHandler
+ some state manipulation:
Copy code
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    x = e()
    y = y(x)
    z(y)

    flow_state = flow.run()

  for task, task_state in state.result.items():
    task_name = slugify.slugify(task.name)
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "wb") as fh:
      fh.write(filepath)
In this way in the
.prefect
directory you would have the a result file for each task as well as a “tracking” file for each task, named by the task name:
Copy code
tree .prefect
.prefect
├── prefect-2g3sboyi
├── prefect-ixeqv82p
├── prefect-km5ufh9y
├── x
├── y
└── z
Note, I used slugify to encode the task name, but you have several options here. A good option if you have off task names is to set the
task.slug
and use this to key off of. If you wanted to do the same thing but in a task state_handler:
Copy code
def handler(task, old, new):
  if new.is_successful():
    task_name = slugify.slugify(task.name)
    filepath = new._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "w") as fh:
      fh.write(filepath)
a
Thanks @Alex Goodman! I'm trying to run your example and I'm getting a
TypeError: a bytes-like object is required, not 'NoResultType'
on the
write
command. This is what I have (I had to make some slight modifications to deal with a couple of other errors:
Copy code
import prefect
from prefect.engine.result_handlers import LocalResultHandler

@prefect.task
def e():
    return 5

@prefect.task
def y (input_val):
    return input_val * 3

with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    x = e()
    y = y(x)
flow_state = flow.run()

for task, task_state in flow_state.result.items():
    task_name = slugify.slugify(task.name)
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "wb") as fh:
        fh.write(filepath)
a
ahh, my mistake... change
"wb"
in the
open
clause to
"w"
a
TypeError: write() argument must be str, not NoResultType
a
gotcha, in my example all tasks returned results, where in your flow there are some tasks with no returned results. Before writing anything we'll have to take account for this
a
which of my tasks don't return a result? both of my tasks have a
return
a
let me run it locally to see what's up
👍 1
a
thanks
a
Interesting, I wasn't able to reproduce:
Copy code
~/code/ in-script-cli-poc* 
venv374 ❯ python test.py 
[2019-12-23 21:49:46,282] INFO - prefect.FlowRunner | Beginning Flow run for 'example'
...<snip>...
[2019-12-23 21:49:46,371] DEBUG - prefect.FlowRunner | Flow 'example': Handling state change from Running to Success
                                                                                                                                                                                                                                
~/code/ in-script-cli-poc* 
venv374 ❯ ls -al .prefect 
total 32
drwxr-xr-x   6 user  user  192 Dec 23 16:49 .
drwxr-xr-x  16 user  user  512 Dec 23 16:48 ..
-rw-r--r--   1 user  user   56 Dec 23 16:49 e
-rw-------   1 user  user    5 Dec 23 16:49 prefect-qcf3c39e
-rw-------   1 user  user    5 Dec 23 16:49 prefect-wd24pmtb
-rw-r--r--   1 user  user   56 Dec 23 16:49 y
                                                                                                                                                                                                                                
~/code/ in-script-cli-poc* 
venv374 ❯
The only thing I changed was the
wb
i mentioned, and a few imported (
os
and
slugify
)
a
so that's interesting
oh, looks like I'm on an older version of prefect, maybe that's my problem
a
which version? I can try to reproduce it
a
0.7.3
I'm trying to pull the latest version from
conda-forge
now
a
yup, that's the problem, I can update the example
a
don't worry about it, I'd rather update 🙂
a
all good, updating it usually best, however if you find yourself unable to update a hold over would be:
Copy code
for task, task_state in flow_state.result.items():
    task_name = slugify.slugify(task.name)
    task_state._result.store_safe_value()
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "w") as fh:
        fh.write(filepath)
a
would you be able to paste in your working version of the test script? even after updating mine's still not working, and I'm starting to strongly suspect the problem is between my ears
👍 1
a
nope, looks it was my bad... I may have been depending on features from master, this will do you the trick:
Copy code
import os
import prefect
from prefect.engine.result_handlers import LocalResultHandler

import slugify

@prefect.task
def e():
    return 5
@prefect.task
def y(input_val):
    return input_val * 3
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    x = e()
    y = y(x)
flow_state = flow.run()

for task, task_state in flow_state.result.items():
    task_name = slugify.slugify(task.name)
    task_state._result.store_safe_value()
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "w") as fh:
        fh.write(filepath)
a
yep, that's working now
thanks! this at least gives me something to start playing around with
👍 2
In case anyone in the future is looking for this functionality, here's what I got working:
Copy code
import datetime
import os
import prefect
from prefect.engine.result_handlers import LocalResultHandler

@prefect.task
def generate_data():
    return 5

@prefect.task(
    cache_for=datetime.timedelta(minutes=1)
)
def manipulate_data_1(input_data):
    return input_data * 3

@prefect.task
def manipulate_data_2(input_data):
    print(input_data ** 2)
    return input_data ** 2

def cache_to_result_handler(task, old_state, new_state):
    if old_state.is_successful() and new_state.is_cached():
        old_state._result.store_safe_value()
    if old_state.is_pending() and new_state.is_cached():
        new_state._result = new_state._result.safe_value.to_result()
    return new_state


with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    data_1 = generate_data()
    data_2 = manipulate_data_1(data_1)
    data_3 = manipulate_data_2(data_2)

flow_state = flow.run(
        task_runner_state_handlers=[cache_to_result_handler]
)
for task_name, caches in prefect.context.caches.items():
    for cache in caches:
        cache._result.value = None

flow_state_2 = flow.run(
        task_runner_state_handlers=[cache_to_result_handler]
)
First, there's a state handler that does 2 things: for state changes from successful to cached, it invokes the
safe_result
method to trigger the `result_handler`'s
write
method; for state changes from pending to cached, it inflates the safe result by invoking the
to_result
method, and uses that to replace the cached result. Second, at the end of the first flow, a quick loop replaces all cached result values with
None
so that the results won't take up any space if persisted to disk. It seems to get the job done, although I'm not super psyched about it because manually editing the cache post-flow seems really kludgey.