Vincent Chéry
12/09/2021, 1:40 PMLocalAgent
running which gets its orders from a private prefect server instance and runs flow runs locally, and I basically do not want to persist any result on disk. Following the docs I have defined all my tasks with @task(checkpoint=False)
, which does the job for custom tasks but does not give me a solution for prefect tasks like prefect.tasks.control_flow.merge
.
I have tried setting task checkpointing to false
globally:
[tasks]
checkpointing = false
[tasks.defaults]
checkpointing = false
without success.
Any idea? Thanks a lot in advance!Vincent Chéry
12/09/2021, 1:44 PM@task(checkpoint=False)
def do_A():
return something
@task(checkpoint=False)
def do_B():
return something_else
with Flow("My flow") as flow:
with case(cond1):
result_A = do_A()
with case(cond2):
result_B = do_B()
result=merge(result_A, result_B)
-> tasks do_A
and do_B
will not checkpoint any result, but the merge
task will checkpoint result
, which I do not need and fills my storage pretty quickly.Anna Geller
export PREFECT__FLOWS__CHECKPOINTING=false
I will try to reproduceVincent Chéry
12/09/2021, 1:53 PMAnna Geller
from prefect import task, Flow
from prefect.tasks.control_flow import merge, case
@task(checkpoint=False)
def check_condition():
return True
@task(checkpoint=False)
def get_hello():
return "hello"
@task(checkpoint=False)
def get_hi():
return "hi"
with Flow("conditional") as flow:
cond = check_condition()
with case(cond, True):
a = get_hello()
with case(cond, False):
b = get_hi()
result = merge(a, b)
Then:
ls ~/.prefect/results/
shows nothingAnna Geller
Anna Geller
flow.register()
When you register this way, your flow, will get pickled and stored in ~/.prefect/flows/your-flow-name
Then, when you run it, you will see the result of this merge task, as shown in the image, because the result will be pickled the same way the flow did.
However, when you register using CLI, then the flow will be stored as script because CLI gets the path of your flow on the machine:
prefect register --project yourproject -p yourflow.py
When registering with CLI, the results of merge were not persisted.Vincent Chéry
12/09/2021, 2:27 PMexport PREFECT__FLOWS__CHECKPOINTING=false
I also get nothing stored when running core only with flow.run()
, but when running with server, the results are stored even though PREFECT__FLOWS__CHECKPOINTING
is falseVincent Chéry
12/09/2021, 2:28 PMVincent Chéry
12/09/2021, 2:29 PMflow.register()
. I have ~20 flows which are all registered by script, every time we push a new version in productionVincent Chéry
12/09/2021, 2:31 PMmerge
in a custom task ? Like
from prefect.tasks.control_flow import merge
@task(checkpoint=False)
def merge_(A, B):
return merge(A, B)
with Flow("My flow") as flow:
with case(cond1):
result_A = do_A()
with case(cond2):
result_B = do_B()
result=merge_(result_A, result_B)
Vincent Chéry
12/09/2021, 2:37 PMVincent Chéry
12/09/2021, 2:41 PMmerge
needs to be in a Flow context and cannot be used in a taskAnna Geller
prefect register --project yourprojectname -p path/to/flows/dir/
given that dir contains all your flows, the above command would register all of them in a batch process.Anna Geller
stored_as_script=True
, this should have the same effect:
from prefect.storage import Local
flow.storage = Local(path="/path/to/flow.py", stored_as_script=True)
Vincent Chéry
12/09/2021, 2:44 PMVincent Chéry
12/09/2021, 3:00 PM"/path/to/flow.py"
be the path where the source file is, or another location where the registration process will put a copy ?Anna Geller
Vincent Chéry
12/09/2021, 3:03 PMKevin Kho
@task(checkpoint=False)
def do_A():
return something
@task(checkpoint=False)
def do_B():
return something_else
merge.checkpoint=False
with Flow("My flow") as flow:
with case(cond1):
result_A = do_A()
with case(cond2):
result_B = do_B()
result=merge(result_A, result_B)
Vincent Chéry
12/09/2021, 3:18 PMmerge.checkpoint=False
Kevin Kho
merge
doesn’t seem to be a task itself. It’s a function that adds the Merge
task to your Flow. You can do
from prefect.tasks.control_flow.conditional import Merge
merge = Merge(checkpoint=False)
with Flow(...) as flow:
...
merge(a,b)
Vincent Chéry
12/09/2021, 3:22 PMmerge
does :
return Merge().bind(
**{"task_{}".format(i + 1): t for i, t in enumerate(tasks)},
flow=flow,
mapped=mapped
)
https://github.com/PrefectHQ/prefect/blob/cc9daa6ffc096f7c557a873df0a70d64571dd3f1/src/prefect/tasks/control_flow/conditional.py#L165Vincent Chéry
12/09/2021, 3:22 PMKevin Kho
bind
is from the imperative API to attach the Task to the Flow but the with Flow(…)
does that for youVincent Chéry
12/09/2021, 3:24 PMKevin Kho
Vincent Chéry
12/09/2021, 3:27 PMTypeError: too many positional arguments
Vincent Chéry
12/09/2021, 3:27 PMmerge(A, B)
Vincent Chéry
12/09/2021, 3:27 PMVincent Chéry
12/09/2021, 3:31 PMVincent Chéry
12/09/2021, 3:31 PM**kwargs
to merge
and pass them to Merge
?Kevin Kho
Vincent Chéry
12/09/2021, 4:51 PMKevin Kho
Vincent Chéry
12/09/2021, 4:54 PMVincent Chéry
12/09/2021, 4:55 PMKevin Kho
Vincent Chéry
12/09/2021, 4:57 PMKevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
@task(checkpoint=False)
def do_A():
return something
@task(checkpoint=False)
def do_B():
return something_else
with Flow("My flow") as flow:
with case(cond1):
result_A = do_A()
with case(cond2):
result_B = do_B()
result=merge(result_A, result_B, checkpoint=False)
assert result.checkpoint is False
Kevin Kho
Vincent Chéry
12/15/2021, 6:05 PMVincent Chéry
12/15/2021, 6:05 PMKevin Kho