https://prefect.io logo
Title
a

Alex Goodman

12/23/2019, 9:27 PM
Indeed, @Andrew Schechtman-Rook! The
LocalResultHandler
does store information without regard to the task in question, however, the state object as a result of each task run does keep track of the path written to. You can then persist the task-to-file mapping separately any way you’d like. You could persist all of the state returned by
flow.run()
, or persist as state is available with a state_handler. An example of caching this for the whole flow using the
LocalResultHandler
+ some state manipulation:
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    x = e()
    y = y(x)
    z(y)

    flow_state = flow.run()

  for task, task_state in state.result.items():
    task_name = slugify.slugify(task.name)
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "wb") as fh:
      fh.write(filepath)
In this way in the
.prefect
directory you would have the a result file for each task as well as a “tracking” file for each task, named by the task name:
tree .prefect
.prefect
├── prefect-2g3sboyi
├── prefect-ixeqv82p
├── prefect-km5ufh9y
├── x
├── y
└── z
Note, I used slugify to encode the task name, but you have several options here. A good option if you have off task names is to set the
task.slug
and use this to key off of. If you wanted to do the same thing but in a task state_handler:
def handler(task, old, new):
  if new.is_successful():
    task_name = slugify.slugify(task.name)
    filepath = new._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "w") as fh:
      fh.write(filepath)
a

Andrew Schechtman-Rook

12/23/2019, 9:44 PM
Thanks @Alex Goodman! I'm trying to run your example and I'm getting a
TypeError: a bytes-like object is required, not 'NoResultType'
on the
write
command. This is what I have (I had to make some slight modifications to deal with a couple of other errors:
import prefect
from prefect.engine.result_handlers import LocalResultHandler

@prefect.task
def e():
    return 5

@prefect.task
def y (input_val):
    return input_val * 3

with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    x = e()
    y = y(x)
flow_state = flow.run()

for task, task_state in flow_state.result.items():
    task_name = slugify.slugify(task.name)
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "wb") as fh:
        fh.write(filepath)
a

Alex Goodman

12/23/2019, 9:45 PM
ahh, my mistake... change
"wb"
in the
open
clause to
"w"
a

Andrew Schechtman-Rook

12/23/2019, 9:45 PM
TypeError: write() argument must be str, not NoResultType
a

Alex Goodman

12/23/2019, 9:46 PM
gotcha, in my example all tasks returned results, where in your flow there are some tasks with no returned results. Before writing anything we'll have to take account for this
a

Andrew Schechtman-Rook

12/23/2019, 9:47 PM
which of my tasks don't return a result? both of my tasks have a
return
a

Alex Goodman

12/23/2019, 9:48 PM
let me run it locally to see what's up
👍 1
a

Andrew Schechtman-Rook

12/23/2019, 9:48 PM
thanks
a

Alex Goodman

12/23/2019, 9:51 PM
Interesting, I wasn't able to reproduce:
~/code/ in-script-cli-poc* 
venv374 ❯ python test.py 
[2019-12-23 21:49:46,282] INFO - prefect.FlowRunner | Beginning Flow run for 'example'
...<snip>...
[2019-12-23 21:49:46,371] DEBUG - prefect.FlowRunner | Flow 'example': Handling state change from Running to Success
                                                                                                                                                                                                                                
~/code/ in-script-cli-poc* 
venv374 ❯ ls -al .prefect 
total 32
drwxr-xr-x   6 user  user  192 Dec 23 16:49 .
drwxr-xr-x  16 user  user  512 Dec 23 16:48 ..
-rw-r--r--   1 user  user   56 Dec 23 16:49 e
-rw-------   1 user  user    5 Dec 23 16:49 prefect-qcf3c39e
-rw-------   1 user  user    5 Dec 23 16:49 prefect-wd24pmtb
-rw-r--r--   1 user  user   56 Dec 23 16:49 y
                                                                                                                                                                                                                                
~/code/ in-script-cli-poc* 
venv374 ❯
The only thing I changed was the
wb
i mentioned, and a few imported (
os
and
slugify
)
a

Andrew Schechtman-Rook

12/23/2019, 9:53 PM
so that's interesting
oh, looks like I'm on an older version of prefect, maybe that's my problem
a

Alex Goodman

12/23/2019, 9:54 PM
which version? I can try to reproduce it
a

Andrew Schechtman-Rook

12/23/2019, 9:54 PM
0.7.3
I'm trying to pull the latest version from
conda-forge
now
a

Alex Goodman

12/23/2019, 9:56 PM
yup, that's the problem, I can update the example
a

Andrew Schechtman-Rook

12/23/2019, 9:56 PM
don't worry about it, I'd rather update 🙂
a

Alex Goodman

12/23/2019, 10:03 PM
all good, updating it usually best, however if you find yourself unable to update a hold over would be:
for task, task_state in flow_state.result.items():
    task_name = slugify.slugify(task.name)
    task_state._result.store_safe_value()
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "w") as fh:
        fh.write(filepath)
a

Andrew Schechtman-Rook

12/23/2019, 10:03 PM
would you be able to paste in your working version of the test script? even after updating mine's still not working, and I'm starting to strongly suspect the problem is between my ears
👍 1
a

Alex Goodman

12/23/2019, 10:10 PM
nope, looks it was my bad... I may have been depending on features from master, this will do you the trick:
import os
import prefect
from prefect.engine.result_handlers import LocalResultHandler

import slugify

@prefect.task
def e():
    return 5
@prefect.task
def y(input_val):
    return input_val * 3
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    x = e()
    y = y(x)
flow_state = flow.run()

for task, task_state in flow_state.result.items():
    task_name = slugify.slugify(task.name)
    task_state._result.store_safe_value()
    filepath = task_state._result.safe_value.value
    with open(os.path.join(".prefect", task_name), "w") as fh:
        fh.write(filepath)
a

Andrew Schechtman-Rook

12/23/2019, 10:11 PM
yep, that's working now
thanks! this at least gives me something to start playing around with
👍 2
In case anyone in the future is looking for this functionality, here's what I got working:
import datetime
import os
import prefect
from prefect.engine.result_handlers import LocalResultHandler

@prefect.task
def generate_data():
    return 5

@prefect.task(
    cache_for=datetime.timedelta(minutes=1)
)
def manipulate_data_1(input_data):
    return input_data * 3

@prefect.task
def manipulate_data_2(input_data):
    print(input_data ** 2)
    return input_data ** 2

def cache_to_result_handler(task, old_state, new_state):
    if old_state.is_successful() and new_state.is_cached():
        old_state._result.store_safe_value()
    if old_state.is_pending() and new_state.is_cached():
        new_state._result = new_state._result.safe_value.to_result()
    return new_state


with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
    data_1 = generate_data()
    data_2 = manipulate_data_1(data_1)
    data_3 = manipulate_data_2(data_2)

flow_state = flow.run(
        task_runner_state_handlers=[cache_to_result_handler]
)
for task_name, caches in prefect.context.caches.items():
    for cache in caches:
        cache._result.value = None

flow_state_2 = flow.run(
        task_runner_state_handlers=[cache_to_result_handler]
)
First, there's a state handler that does 2 things: for state changes from successful to cached, it invokes the
safe_result
method to trigger the `result_handler`'s
write
method; for state changes from pending to cached, it inflates the safe result by invoking the
to_result
method, and uses that to replace the cached result. Second, at the end of the first flow, a quick loop replaces all cached result values with
None
so that the results won't take up any space if persisted to disk. It seems to get the job done, although I'm not super psyched about it because manually editing the cache post-flow seems really kludgey.