Neil Natarajan
08/24/2022, 4:49 PMDylan
08/24/2022, 5:35 PMDylan
08/24/2022, 5:36 PMDylan
08/24/2022, 5:37 PMKrishnan Chandra
08/24/2022, 6:12 PMprefect deployment build
command, but this hasn’t worked:
prefect deployment build \
...
--override path=/app/
Is there a way to override the path setting, or is this something I’d need to modify in the YAML files after generation?Blake Hamm
08/24/2022, 6:12 PMTomás Emilio Silva Ebensperger
08/24/2022, 6:36 PMagent = LocalAgent()
agent.labels = ['custom_label'}
instead of passing the labels to the LocalAgent params upon instantiation. This way i have complete separation from agents even if they are running locally on the same machine
Now the issue with registering flows is the same, the labels assigned to the flow are the ones you provide + that default local label. I can't seem to remove the default label before registration and
the agents don't work, because their label must be a `superset`of the flow's labels.Tuoyi Zhao
08/24/2022, 8:00 PMMarc Lipoff
08/24/2022, 8:26 PMKeith Hickey
08/24/2022, 9:12 PMTuoyi Zhao
08/24/2022, 10:17 PMAaron Ash
08/25/2022, 12:14 AMAaron Ash
08/25/2022, 12:15 AMAaron Ash
08/25/2022, 12:18 AMValueError: Path /root/.prefect/storage/<GUID> does not exist
sometimes.
What I suspect is happening is a task is failing and a different ray worker node is attempting to retry it, seeing that the result was already serialized to disk and trying to read it from the cache.
Except that the cache was on a separate node and doesn't exist locally.
With Prefect 1.0 we were using S3Result() for all of our tasks which worked really nicely.
It looks like there's currently no equivalent for orion.
Are there any plans to add that functionality back in?
Is there a good work around we can use in the meantime?Jacob Blanco
08/25/2022, 1:50 AMJacob Blanco
08/25/2022, 1:53 AMAndreas Nigg
08/25/2022, 6:06 AMPriyank
08/25/2022, 6:44 AMTrine Bruun Brockhoff
08/25/2022, 7:06 AMSaman
08/25/2022, 10:32 AMJohn
08/25/2022, 11:38 AMAmol Shirke
08/25/2022, 12:07 PMStéphan Taljaard
08/25/2022, 1:52 PMtask._*run*()_
?
I want to add a pytest to check and prevent adding incorrect task uses like this.
import prefect
@prefect.task
def task1(par):
task2() # this is incorrect! it should be task2.run()
@prefect.task
def task2():
return 1
with prefect.Flow("Sample flow") as flow:
aa = prefect.Parameter("aa")
task1(aa)
flow.validate()
doesn't return errors/warnings, since it doesn't check what's happening inside each task.
Meanwhile, in the task, there's another task hiding and being used incorrectly. It will fail when running the flow.
ValueError: Could not infer an active Flow context while creating edge to <Task: task2>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `task2.run(...)`
I could do a flow.run()
in my test, but that can be cumbersome and require a lot of scaffolding in terms of setting up mocks, and even with mocks, the flow run might just be too slow.
A task.validate()
can be handly, but I'm finding some difficulty in writing such a function.Josh Paulin
08/25/2022, 2:29 PMMansour Zayer
08/25/2022, 3:02 PMdate.today()
and date.today() - timedelta(1 day)
. Now I want to make the flows resilient. I want the flow's date parameter to be the last success flow run, so that if one day the flow run fails, the next flow run automatically takes the last 2 days. Basically, I want the dates to be date.today()
and last_successful_flow_run_date
.
I was thinking of storing the flow run result and its parameters in an S3 bucket, then query it every time to recover the last date that the flow ran successfully. But I'm not sure if this is the best way.
I'd appreciate any help with this. Thank youNikhil Joseph
08/25/2022, 4:04 PMTuoyi Zhao
08/25/2022, 4:40 PMAlex Wilcoxson
08/25/2022, 4:42 PMLeon Kozlowski
08/25/2022, 4:54 PMSoren Daugaard
08/25/2022, 5:10 PMConcurrentTaskRunner
?Soren Daugaard
08/25/2022, 5:10 PMConcurrentTaskRunner
?