Lior
11/25/2020, 11:26 AMJim Crist-Harif
11/25/2020, 4:29 PMLior
11/26/2020, 8:20 AMCedricT
02/16/2021, 8:07 AMfrom prefect import task, Flow
from prefect.engine.results import LocalResult
import prefect
print(prefect.config.flows.checkpointing)
prefect.config.flows.checkpointing = True
prefect.config.tasks.checkpointing = True
print(prefect.config.flows.checkpointing)
@task(checkpoint=True, result=LocalResult(location="initial_data.prefect"))
def root_task():
return [1, 2, 3]
@task(checkpoint=True, result=LocalResult(location="{date:%A}/{task_name}.prefect"))
def downstream_task(x):
return [i * 10 for i in x]
with Flow("local-results") as flow:
downstream_task(root_task)
flow.run()
I tried this here which is basically from the tutorial plus adding the checkpointing but no successJim Crist-Harif
02/16/2021, 4:53 PMprefect.config
isn't really supported. You should set flows.checkpointing
in either your ~/.prefect/config.toml
file, or via an environment variable (PREFECT__FLOWS__CHECKPOINTING=true
). Also note that the locations as specified will be stored in your ~/.prefect/results
directory (so if you tried this before, you might have missed them), you need to pass in a dir
explicitly to use a different directory.
The following script runs fine for me, and writes results to ~/Code/prefect/demo_results/
(you may want to change this path, or omit it to go back to using ~/.prefect/results
).
from prefect import task, Flow
import prefect
from prefect.engine.results import LocalResult
print(prefect.config.flows.checkpointing)
@task(checkpoint=True, result=LocalResult(dir="~/Code/prefect/demo_results", location="initial_data.prefect"))
def root_task():
return [1, 2, 3]
@task(checkpoint=True, result=LocalResult(dir="~/Code/prefect/demo_results", location="{date:%A}/{task_name}.prefect"))
def downstream_task(x):
return [i * 10 for i in x]
with Flow("local-results") as flow:
downstream_task(root_task)
flow.run()
$ PREFECT__FLOWS__CHECKPOINTING=true python demo.py
True
[2021-02-16 10:53:15-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'local-results'
[2021-02-16 10:53:15-0600] INFO - prefect.TaskRunner | Task 'root_task': Starting task run...
[2021-02-16 10:53:15-0600] INFO - prefect.TaskRunner | Task 'root_task': Finished task run for task with final state: 'Success'
[2021-02-16 10:53:15-0600] INFO - prefect.TaskRunner | Task 'downstream_task': Starting task run...
[2021-02-16 10:53:15-0600] INFO - prefect.TaskRunner | Task 'downstream_task': Finished task run for task with final state: 'Success'
[2021-02-16 10:53:15-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
$ ls demo_results
Tuesday initial_data.prefect
CedricT
02/17/2021, 7:42 AMJim Crist-Harif
02/17/2021, 3:39 PMPREFECT__FLOWS__CHECKPOINTING=true
in your config.toml
, you need to use toml syntax (you might have done that already, just clarifying). In your `~/.prefect/config.toml`:
[flows]
checkpointing = true
If you did this already and it didn't work, the next thing I'd check is if your configuration is being picked up properly. If configured properly, you should see True
from the output of:
python -c "import prefect;print(prefect.config.flows.checkpointing)"
CedricT
02/18/2021, 1:33 PM