Alex Goodman
12/23/2019, 9:27 PMLocalResultHandler
does store information without regard to the task in question, however, the state object as a result of each task run does keep track of the path written to. You can then persist the task-to-file mapping separately any way you’d like. You could persist all of the state returned by flow.run()
, or persist as state is available with a state_handler.
An example of caching this for the whole flow using the LocalResultHandler
+ some state manipulation:
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
x = e()
y = y(x)
z(y)
flow_state = flow.run()
for task, task_state in state.result.items():
task_name = slugify.slugify(task.name)
filepath = task_state._result.safe_value.value
with open(os.path.join(".prefect", task_name), "wb") as fh:
fh.write(filepath)
In this way in the .prefect
directory you would have the a result file for each task as well as a “tracking” file for each task, named by the task name:
tree .prefect
.prefect
├── prefect-2g3sboyi
├── prefect-ixeqv82p
├── prefect-km5ufh9y
├── x
├── y
└── z
Note, I used slugify to encode the task name, but you have several options here. A good option if you have off task names is to set the task.slug
and use this to key off of.
If you wanted to do the same thing but in a task state_handler:
def handler(task, old, new):
if new.is_successful():
task_name = slugify.slugify(task.name)
filepath = new._result.safe_value.value
with open(os.path.join(".prefect", task_name), "w") as fh:
fh.write(filepath)
Andrew Schechtman-Rook
12/23/2019, 9:44 PMTypeError: a bytes-like object is required, not 'NoResultType'
on the write
command. This is what I have (I had to make some slight modifications to deal with a couple of other errors:
import prefect
from prefect.engine.result_handlers import LocalResultHandler
@prefect.task
def e():
return 5
@prefect.task
def y (input_val):
return input_val * 3
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
x = e()
y = y(x)
flow_state = flow.run()
for task, task_state in flow_state.result.items():
task_name = slugify.slugify(task.name)
filepath = task_state._result.safe_value.value
with open(os.path.join(".prefect", task_name), "wb") as fh:
fh.write(filepath)
Alex Goodman
12/23/2019, 9:45 PM"wb"
in the open
clause to "w"
Andrew Schechtman-Rook
12/23/2019, 9:45 PMTypeError: write() argument must be str, not NoResultType
Alex Goodman
12/23/2019, 9:46 PMAndrew Schechtman-Rook
12/23/2019, 9:47 PMreturn
Alex Goodman
12/23/2019, 9:48 PMAndrew Schechtman-Rook
12/23/2019, 9:48 PMAlex Goodman
12/23/2019, 9:51 PM~/code/ in-script-cli-poc*
venv374 ❯ python test.py
[2019-12-23 21:49:46,282] INFO - prefect.FlowRunner | Beginning Flow run for 'example'
...<snip>...
[2019-12-23 21:49:46,371] DEBUG - prefect.FlowRunner | Flow 'example': Handling state change from Running to Success
~/code/ in-script-cli-poc*
venv374 ❯ ls -al .prefect
total 32
drwxr-xr-x 6 user user 192 Dec 23 16:49 .
drwxr-xr-x 16 user user 512 Dec 23 16:48 ..
-rw-r--r-- 1 user user 56 Dec 23 16:49 e
-rw------- 1 user user 5 Dec 23 16:49 prefect-qcf3c39e
-rw------- 1 user user 5 Dec 23 16:49 prefect-wd24pmtb
-rw-r--r-- 1 user user 56 Dec 23 16:49 y
~/code/ in-script-cli-poc*
venv374 ❯
The only thing I changed was the wb
i mentioned, and a few imported (os
and slugify
)Andrew Schechtman-Rook
12/23/2019, 9:53 PMAlex Goodman
12/23/2019, 9:54 PMAndrew Schechtman-Rook
12/23/2019, 9:54 PMconda-forge
nowAlex Goodman
12/23/2019, 9:56 PMAndrew Schechtman-Rook
12/23/2019, 9:56 PMAlex Goodman
12/23/2019, 10:03 PMfor task, task_state in flow_state.result.items():
task_name = slugify.slugify(task.name)
task_state._result.store_safe_value()
filepath = task_state._result.safe_value.value
with open(os.path.join(".prefect", task_name), "w") as fh:
fh.write(filepath)
Andrew Schechtman-Rook
12/23/2019, 10:03 PMAlex Goodman
12/23/2019, 10:10 PMimport os
import prefect
from prefect.engine.result_handlers import LocalResultHandler
import slugify
@prefect.task
def e():
return 5
@prefect.task
def y(input_val):
return input_val * 3
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
x = e()
y = y(x)
flow_state = flow.run()
for task, task_state in flow_state.result.items():
task_name = slugify.slugify(task.name)
task_state._result.store_safe_value()
filepath = task_state._result.safe_value.value
with open(os.path.join(".prefect", task_name), "w") as fh:
fh.write(filepath)
Andrew Schechtman-Rook
12/23/2019, 10:11 PMimport datetime
import os
import prefect
from prefect.engine.result_handlers import LocalResultHandler
@prefect.task
def generate_data():
return 5
@prefect.task(
cache_for=datetime.timedelta(minutes=1)
)
def manipulate_data_1(input_data):
return input_data * 3
@prefect.task
def manipulate_data_2(input_data):
print(input_data ** 2)
return input_data ** 2
def cache_to_result_handler(task, old_state, new_state):
if old_state.is_successful() and new_state.is_cached():
old_state._result.store_safe_value()
if old_state.is_pending() and new_state.is_cached():
new_state._result = new_state._result.safe_value.to_result()
return new_state
with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
data_1 = generate_data()
data_2 = manipulate_data_1(data_1)
data_3 = manipulate_data_2(data_2)
flow_state = flow.run(
task_runner_state_handlers=[cache_to_result_handler]
)
for task_name, caches in prefect.context.caches.items():
for cache in caches:
cache._result.value = None
flow_state_2 = flow.run(
task_runner_state_handlers=[cache_to_result_handler]
)
First, there's a state handler that does 2 things: for state changes from successful to cached, it invokes the safe_result
method to trigger the `result_handler`'s write
method; for state changes from pending to cached, it inflates the safe result by invoking the to_result
method, and uses that to replace the cached result. Second, at the end of the first flow, a quick loop replaces all cached result values with None
so that the results won't take up any space if persisted to disk.
It seems to get the job done, although I'm not super psyched about it because manually editing the cache post-flow seems really kludgey.