Tuoyi Zhao
08/24/2022, 10:17 PMAaron Ash
08/25/2022, 12:14 AMAaron Ash
08/25/2022, 12:15 AMAaron Ash
08/25/2022, 12:18 AMValueError: Path /root/.prefect/storage/<GUID> does not exist
sometimes.
What I suspect is happening is a task is failing and a different ray worker node is attempting to retry it, seeing that the result was already serialized to disk and trying to read it from the cache.
Except that the cache was on a separate node and doesn't exist locally.
With Prefect 1.0 we were using S3Result() for all of our tasks which worked really nicely.
It looks like there's currently no equivalent for orion.
Are there any plans to add that functionality back in?
Is there a good work around we can use in the meantime?Jacob Blanco
08/25/2022, 1:50 AMJacob Blanco
08/25/2022, 1:53 AMAndreas Nigg
08/25/2022, 6:06 AMPriyank
08/25/2022, 6:44 AMTrine Bruun Brockhoff
08/25/2022, 7:06 AMSaman
08/25/2022, 10:32 AMJohn
08/25/2022, 11:38 AMAmol Shirke
08/25/2022, 12:07 PMStéphan Taljaard
08/25/2022, 1:52 PMtask._*run*()_
?
I want to add a pytest to check and prevent adding incorrect task uses like this.
import prefect
@prefect.task
def task1(par):
task2() # this is incorrect! it should be task2.run()
@prefect.task
def task2():
return 1
with prefect.Flow("Sample flow") as flow:
aa = prefect.Parameter("aa")
task1(aa)
flow.validate()
doesn't return errors/warnings, since it doesn't check what's happening inside each task.
Meanwhile, in the task, there's another task hiding and being used incorrectly. It will fail when running the flow.
ValueError: Could not infer an active Flow context while creating edge to <Task: task2>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `task2.run(...)`
I could do a flow.run()
in my test, but that can be cumbersome and require a lot of scaffolding in terms of setting up mocks, and even with mocks, the flow run might just be too slow.
A task.validate()
can be handly, but I'm finding some difficulty in writing such a function.Josh Paulin
08/25/2022, 2:29 PMMansour Zayer
08/25/2022, 3:02 PMdate.today()
and date.today() - timedelta(1 day)
. Now I want to make the flows resilient. I want the flow's date parameter to be the last success flow run, so that if one day the flow run fails, the next flow run automatically takes the last 2 days. Basically, I want the dates to be date.today()
and last_successful_flow_run_date
.
I was thinking of storing the flow run result and its parameters in an S3 bucket, then query it every time to recover the last date that the flow ran successfully. But I'm not sure if this is the best way.
I'd appreciate any help with this. Thank youNikhil Joseph
08/25/2022, 4:04 PMTuoyi Zhao
08/25/2022, 4:40 PMAlex Wilcoxson
08/25/2022, 4:42 PMLeon Kozlowski
08/25/2022, 4:54 PMSoren Daugaard
08/25/2022, 5:10 PMConcurrentTaskRunner
?Lucas Brum
08/25/2022, 5:11 PMKrishnan Chandra
08/25/2022, 5:45 PMBlake Stefansen
08/25/2022, 6:50 PMprocess
infrastructure and the s3
block for filesystem storage. I build my deployment, the yaml is generated, and my flow code is uploaded to s3.
I have prefect orion running locally, and this deployment applied to my local prefect. I then start a local agent within my python virtual environment.
In orion, I click Run
on my deployment. The flow run is moved to my local work queue, my local agent picks up the flow run, and builds the infrastructure.
The agent then runs the prefect engine, which begins pulling my flow code from s3. In doing so, I receive this flow run failure log:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\engine.py", line 254, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\deployments.py", line 55, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\filesystems.py", line 399, in get_directory
return await self.filesystem.get_directory(
File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\filesystems.py", line 260, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\fsspec\spec.py", line 801, in get
self.get_file(rpath, lpath, **kwargs)
File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\fsspec\spec.py", line 769, in get_file
outfile = open(lpath, "wb")
FileNotFoundError: [Errno 2] No such file or directory: 'C:/Users/BSTEFA~1/AppData/Local/Temp/tmp5t7vj96aprefect/aurora\\aurora_constants.py'
Expectation
I would expect the prefect engine to use s3 file paths for pulling down the flow code, but it seems to be using local file paths. Is this an issue with my deployment/flow setup, or is this a bug?
Environment
• Windows 10
• Python 3.10.5
• Prefect 2.2.0Blake Stefansen
08/25/2022, 7:23 PMs3fs
for DockerContainer
infrastructure blocks, but this is also an issue for me with the process
infrastructure when using prefect cloud. If I run a deployment in the cloud using s3
for filesystem storage and process
for infrastructure, our flow runs error out saying
ModuleNotFoundError: No module named 's3fs'
I'm not sure if this is due to our agent being a kubernetes
agent and our deployment infrastructure being process
, or something else. If the process
just runs a subprocess on a docker image made by the kubernetes agent, then I would assume it's an issue with the default image used by the kubernetes agentPhilip MacMenamin
08/25/2022, 7:36 PMcontext
in a task such that the key would be available to other downstream tasks within that run?Marc-Antoine Bélanger
08/25/2022, 7:36 PMEdmondo Porcu
08/25/2022, 8:38 PMpip install -e ".[dev]"
does not install the packagesTuoyi Zhao
08/25/2022, 9:01 PMKevin Grismore
08/25/2022, 9:02 PMUserWarning: Block document has schema checksum sha256:28a9d16c2589998d5281b0496b25edd76c1a0b159095d0c7fe5eb5dc5ab62549 which does not match the schema checksum for class 'GCS'. This indicates the schema has changed and this block may not load.
Should I delete and recreate my GCS block?Jared Robbins
08/25/2022, 9:08 PM