https://prefect.io logo
Title
r

richard-social 812

03/14/2022, 8:35 AM
Hello, Is this the correct way to ensure a task only runs once a week, in a flow scheduled to run daily?
@task(result=LocalResult(dir=f'{os.getcwd()}/storage/pre_delinquency_models',
                         serializer=H2OModelSerializer()),
      checkpoint=True, target='{date:%Y}-Week {date:%U}', log_stdout=True)
def train_new_model(data: pd.DataFrame):
I have it running on prefect cloud with a run_config that looks like below:
run_config = LocalRun(env={'PREFECT__FLOWS__CHECKPOINTING':os.environ.get('PREFECT__FLOWS__CHECKPOINTING', 'true')})
However, looking at the results folder I see that the task result file is created a new with each daily run . What am I missing?
e

emre

03/14/2022, 9:31 AM
Hi richard, I just made a similar setup as follows, and it works with both prefect core runs and server. Can't test with cloud atm.
@task(log_stdout=True)
def out_sumthin():
    x = random.randint(0, 20)
    print(x)
    return x


@task(
    result=LocalResult(dir=f"{os.getcwd()}/storage/pre_delinquency_models"),
    checkpoint=True,
    target="{date:%Y}-Week {date:%U}",
    log_stdout=True,
)
def train_new_model(x):
    return 5


with Flow("Checkpoint") as f:
    out = out_sumthin()
    train_new_model(out)

f.run_config = LocalRun(
    env={
        "PREFECT__FLOWS__CHECKPOINTING": os.environ.get(
            "PREFECT__FLOWS__CHECKPOINTING", "true"
        )
    }
)

f.run()
So I couldn't find the error in your setup. Could you post the debug logs of your flow run? You could grab those by running on an agent with the following settings:
prefect agent local start --show-flow-logs --log-level DEBUG -e PREFECT__LOGGING__LEVEL=DEBUG
Finally, you could try using
cache_for
and
cache_validator
kwargs for task definition, those might end up working.
:upvote: 1
a

Anna Geller

03/14/2022, 9:32 AM
The easiest way to tell Prefect to run this task only once per week would be to leverage caching rather than targets. Targets are file-based persistence and they entail more risk in your use case - e.g. someone could (accidentally) delete the file, resulting in your task being recomputed again. With
cache_for
, Prefect will store that information in the backend and will prevent this task from rerunning again after it ran successfully for the given time duration.
@task(cache_for=datetime.timedelta(days=7))
r

richard-social 812

03/25/2022, 5:47 AM
Thanks @Anna Geller and @emre Adding the cache_for parameter works. But am curious why since the docs say that specifying target and chekpoint should be enough to prevent task rerun
a

Anna Geller

03/25/2022, 8:29 AM
Target has a different purpose than caching - LMK if you need a deep dive about it