Hi all, I'm having trouble with checkpointing and ...
# ask-community
v
Hi all, I'm having trouble with checkpointing and could not find the answer in the docs nor find a work around. I have a
LocalAgent
running which gets its orders from a private prefect server instance and runs flow runs locally, and I basically do not want to persist any result on disk. Following the docs I have defined all my tasks with
@task(checkpoint=False)
, which does the job for custom tasks but does not give me a solution for prefect tasks like
prefect.tasks.control_flow.merge
. I have tried setting task checkpointing to
false
globally:
Copy code
[tasks]
    checkpointing = false
    [tasks.defaults]
        checkpointing = false
without success. Any idea? Thanks a lot in advance!
Example flow :
Copy code
@task(checkpoint=False)
def do_A():
    return something

@task(checkpoint=False)
def do_B():
    return something_else

with Flow("My flow") as flow:
    with case(cond1):
         result_A = do_A()
    with case(cond2):
         result_B = do_B()
    result=merge(result_A, result_B)
-> tasks
do_A
and
do_B
will not checkpoint any result, but the
merge
task will checkpoint
result
, which I do not need and fills my storage pretty quickly.
a
@Vincent Chéry in your config.toml you have it set for tasks, perhaps you can add it on the flow level as well:
Copy code
export PREFECT__FLOWS__CHECKPOINTING=false
I will try to reproduce
v
Hi Anna, thanks ! I tried that already, but not in combination with the settings on tasks, I'll try both tasks and flows checkpointing set to False
a
Unfortunately, I couldn’t reproduce the issue. I didn’t set anything in my config.toml - only set the checkpoint=False in the tasks and no results were stored. I was using this flow:
Copy code
from prefect import task, Flow
from prefect.tasks.control_flow import merge, case


@task(checkpoint=False)
def check_condition():
    return True


@task(checkpoint=False)
def get_hello():
    return "hello"


@task(checkpoint=False)
def get_hi():
    return "hi"


with Flow("conditional") as flow:
    cond = check_condition()
    with case(cond, True):
        a = get_hello()
    with case(cond, False):
        b = get_hi()
    result = merge(a, b)
Then:
Copy code
ls ~/.prefect/results/
shows nothing
Can you perhaps restart your agent (and in the worst case also restart Server) and try again?
Ok, I think I’ve found the issue. You probably register your flow using this?
Copy code
flow.register()
When you register this way, your flow, will get pickled and stored in ~/.prefect/flows/your-flow-name Then, when you run it, you will see the result of this merge task, as shown in the image, because the result will be pickled the same way the flow did. However, when you register using CLI, then the flow will be stored as script because CLI gets the path of your flow on the machine:
Copy code
prefect register --project yourproject -p yourflow.py
When registering with CLI, the results of merge were not persisted.
v
With
export PREFECT__FLOWS__CHECKPOINTING=false
I also get nothing stored when running core only with
flow.run()
, but when running with server, the results are stored even though
PREFECT__FLOWS__CHECKPOINTING
is false
I'm afraid registering with CLI will not be very practical
I indeed register with
flow.register()
. I have ~20 flows which are all registered by script, every time we push a new version in production
Maybe I could wrap
merge
in a custom task ? Like
Copy code
from prefect.tasks.control_flow import merge

@task(checkpoint=False)
def merge_(A, B):
    return merge(A, B)

with Flow("My flow") as flow:
    with case(cond1):
         result_A = do_A()
    with case(cond2):
         result_B = do_B()
    result=merge_(result_A, result_B)
Nope 😕
It seems
merge
needs to be in a Flow context and cannot be used in a task
a
you really don’t need any batch script to register multiple flows when you use the CLI because you can automatically register all flows from a specific directory using:
Copy code
prefect register --project yourprojectname -p path/to/flows/dir/
given that dir contains all your flows, the above command would register all of them in a batch process.
but even if you still wanna use your custom registration process: as long as you explicitly attach a Local storage with
stored_as_script=True
, this should have the same effect:
Copy code
from prefect.storage import Local

flow.storage = Local(path="/path/to/flow.py", stored_as_script=True)
v
OK I'll try that 👍
Should
"/path/to/flow.py"
be the path where the source file is, or another location where the registration process will put a copy ?
a
this is the path to where your flow file is located
v
Thx
k
I think you can do:
Copy code
@task(checkpoint=False)
def do_A():
    return something

@task(checkpoint=False)
def do_B():
    return something_else

merge.checkpoint=False

with Flow("My flow") as flow:
    with case(cond1):
         result_A = do_A()
    with case(cond2):
         result_B = do_B()
    result=merge(result_A, result_B)
v
Hi Kevin, thx, but that does not seem to do the trick, results are still persisted for the merge task even with
merge.checkpoint=False
k
Actually sorry
merge
doesn’t seem to be a task itself. It’s a function that adds the
Merge
task to your Flow. You can do
Copy code
from prefect.tasks.control_flow.conditional import Merge

merge = Merge(checkpoint=False)

with Flow(...) as flow:
    ...
    merge(a,b)
🙏 1
v
I'm not sure what the additionnal code in the definition of
merge
does :
Copy code
return Merge().bind(
        **{"task_{}".format(i + 1): t for i, t in enumerate(tasks)},
        flow=flow,
        mapped=mapped
    )
https://github.com/PrefectHQ/prefect/blob/cc9daa6ffc096f7c557a873df0a70d64571dd3f1/src/prefect/tasks/control_flow/conditional.py#L165
I don't need this ?
k
That is creating the Merge task and
bind
is from the imperative API to attach the Task to the Flow but the
with Flow(…)
does that for you
👍 1
v
Thanks a lot !
k
So I don’t think so…but if what I proposed doesn’t work, then I guess you have to go here and turn off checkpointing in the big Merge
v
I'm getting
TypeError: too many positional arguments
in
merge(A, B)
So I'll do what you said
It works 🍾
Maybe we could add
**kwargs
to
merge
and pass them to
Merge
?
k
I personally think that is the good suggestion. I can open an issue for it if you want.
👍 1
k
Yes I think this should be good. Just not familiar with the unit tests of merge. Would you like to take a stab at a PR into the core repo?
I've never contributed to Prefect so I can give it a shot but if I struggle with unit tests I won't be able to spend too much time on it
k
Let me take a quick look
v
But i'll be happy to try any way 🙂
k
I can’t immediately write the test, but I suggest you open a work in progress PR and then we can get some insight from the core team how to rest
Will write that issue real quick so you can link it
Chatted with an engineer here. Your test might just be as simple as:
Copy code
@task(checkpoint=False)
def do_A():
    return something

@task(checkpoint=False)
def do_B():
    return something_else

with Flow("My flow") as flow:
    with case(cond1):
         result_A = do_A()
    with case(cond2):
         result_B = do_B()
    result=merge(result_A, result_B, checkpoint=False)

assert result.checkpoint is False
v
Yeah I've seen it, I was just about to submit my PR 😛
haha
k
Yeah he had the same problem i guess lol