John Shearer
12/07/2021, 6:16 PMPREFECT__FLOWS__CHECKPOINTING=false
but with checkpoint data present in the prefect result directory would read from those results? - I would expect this, but this is the current behaviour (on my machine ...)Anna Geller
@task(checkpoint=False)
is importantJohn Shearer
12/07/2021, 6:17 PMJohn Shearer
12/07/2021, 6:18 PM@task(checkpoint=True)
for the tasks, but checkingpoint has been disabled at the flow level (either config file, or environment variable)Kevin Kho
John Shearer
12/07/2021, 6:20 PM@task(checkpoint=True)
they don't write out results when env var PREFECT__FLOWS__CHECKPOINTING=false
, but they do appear to read results in that same case (if they are already present)John Shearer
12/07/2021, 6:21 PMKevin Kho
John Shearer
12/07/2021, 6:23 PMJohn Shearer
12/07/2021, 6:28 PM@task(checkpoint=True)
- with a result location based on the current date (no time component).
results directory is initially empty
1. I run my flow from pytest with environment variable PREFECT__FLOWS__CHECKPOINTING=false
a. this creates no files in the results directory - YAY 🙂
2. I run my flow from pytest with environment variable PREFECT__FLOWS__CHECKPOINTING=true
a. this creates some files in the results directory - YAY 🙂
3. I run my flow (again) from pytest with environment variable PREFECT__FLOWS__CHECKPOINTING=true
a. this reads from the files in the results directory - YAY 🙂
4. I run my flow from pytest with environment variable PREFECT__FLOWS__CHECKPOINTING=false
a. this reads from the files in the results directory - Unexpected 😟
Does that make sense?Kevin Kho
John Shearer
12/07/2021, 6:35 PM@task(result=pandas_result, target=parquet_location)
def some_task():
...
-
import pendulum
def pickle_location(**kwargs) -> str:
return location_by_extension(suffix="pickle", **kwargs)
def parquet_location(**kwargs) -> str:
return location_by_extension(suffix="parquet", **kwargs)
def location_by_extension(flow_name, scheduled_start_time, task_slug, suffix="parquet", **kwargs):
date: str = pendulum.instance(scheduled_start_time).format("Y/M/D")
# time: str = slugify(pendulum.instance(scheduled_start_time).time().isoformat())
# return f"{date}/{time}__{flow_run_id}/{task_slug}-prefect_result.{suffix}"
return f"{flow_name}/{date}/{task_slug}-prefect_result.{suffix}"
sure. Sorry, it's a little uglyJohn Shearer
12/07/2021, 6:36 PMresult
or target
to be used with ``PREFECT__FLOWS__CHECKPOINTING=false`` , though I likely have a misunderstanding somewhereJohn Shearer
12/07/2021, 6:36 PMKevin Kho
target
that is causing this behavior. Targets are file based caching mechanisms so if the file exists, it will load the file instead of executing the task. You can use the result.location
instead without the target and I think this will work an intendedKevin Kho
@task(result=Result(..,location="..."))
John Shearer
12/07/2021, 6:51 PMJohn Shearer
12/07/2021, 6:51 PM