Tuoyi Zhao08/24/2022, 10:17 PM
Aaron Ash08/25/2022, 12:14 AM
Aaron Ash08/25/2022, 12:15 AM
Aaron Ash08/25/2022, 12:18 AM
sometimes. What I suspect is happening is a task is failing and a different ray worker node is attempting to retry it, seeing that the result was already serialized to disk and trying to read it from the cache. Except that the cache was on a separate node and doesn't exist locally. With Prefect 1.0 we were using S3Result() for all of our tasks which worked really nicely. It looks like there's currently no equivalent for orion. Are there any plans to add that functionality back in? Is there a good work around we can use in the meantime?
ValueError: Path /root/.prefect/storage/<GUID> does not exist
Jacob Blanco08/25/2022, 1:50 AM
Jacob Blanco08/25/2022, 1:53 AM
Andreas Nigg08/25/2022, 6:06 AM
Priyank08/25/2022, 6:44 AM
Trine Bruun Brockhoff08/25/2022, 7:06 AM
Saman08/25/2022, 10:32 AM
John08/25/2022, 11:38 AM
Amol Shirke08/25/2022, 12:07 PM
Stéphan Taljaard08/25/2022, 1:52 PM
? I want to add a pytest to check and prevent adding incorrect task uses like this.
import prefect @prefect.task def task1(par): task2() # this is incorrect! it should be task2.run() @prefect.task def task2(): return 1 with prefect.Flow("Sample flow") as flow: aa = prefect.Parameter("aa") task1(aa)
doesn't return errors/warnings, since it doesn't check what's happening inside each task. Meanwhile, in the task, there's another task hiding and being used incorrectly. It will fail when running the flow.
I could do a
ValueError: Could not infer an active Flow context while creating edge to <Task: task2>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `task2.run(...)`
in my test, but that can be cumbersome and require a lot of scaffolding in terms of setting up mocks, and even with mocks, the flow run might just be too slow. A
can be handly, but I'm finding some difficulty in writing such a function.
Mansour Zayer08/25/2022, 3:02 PM
. Now I want to make the flows resilient. I want the flow's date parameter to be the last success flow run, so that if one day the flow run fails, the next flow run automatically takes the last 2 days. Basically, I want the dates to be
date.today() - timedelta(1 day)
. I was thinking of storing the flow run result and its parameters in an S3 bucket, then query it every time to recover the last date that the flow ran successfully. But I'm not sure if this is the best way. I'd appreciate any help with this. Thank you
Nikhil Joseph08/25/2022, 4:04 PM
Tuoyi Zhao08/25/2022, 4:40 PM
Alex Wilcoxson08/25/2022, 4:42 PM
Leon Kozlowski08/25/2022, 4:54 PM
Soren Daugaard08/25/2022, 5:10 PM
Lucas Brum08/25/2022, 5:11 PM
Krishnan Chandra08/25/2022, 5:45 PM
Blake Stefansen08/25/2022, 6:50 PM
infrastructure and the
block for filesystem storage. I build my deployment, the yaml is generated, and my flow code is uploaded to s3. I have prefect orion running locally, and this deployment applied to my local prefect. I then start a local agent within my python virtual environment. In orion, I click
on my deployment. The flow run is moved to my local work queue, my local agent picks up the flow run, and builds the infrastructure. The agent then runs the prefect engine, which begins pulling my flow code from s3. In doing so, I receive this flow run failure log:
Expectation I would expect the prefect engine to use s3 file paths for pulling down the flow code, but it seems to be using local file paths. Is this an issue with my deployment/flow setup, or is this a bug? Environment • Windows 10 • Python 3.10.5 • Prefect 2.2.0
Flow could not be retrieved from deployment. Traceback (most recent call last): File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\engine.py", line 254, in retrieve_flow_then_begin_flow_run flow = await load_flow_from_flow_run(flow_run, client=client) File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\client.py", line 104, in with_injected_client return await fn(*args, **kwargs) File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\deployments.py", line 55, in load_flow_from_flow_run await storage_block.get_directory(from_path=None, local_path=".") File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\filesystems.py", line 399, in get_directory return await self.filesystem.get_directory( File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\prefect\filesystems.py", line 260, in get_directory return self.filesystem.get(from_path, local_path, recursive=True) File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\fsspec\spec.py", line 801, in get self.get_file(rpath, lpath, **kwargs) File "C:\Users\BStefansen\.virtualenvs\lbx-broadband-pipeline-4k_4F-H8\lib\site-packages\fsspec\spec.py", line 769, in get_file outfile = open(lpath, "wb") FileNotFoundError: [Errno 2] No such file or directory: 'C:/Users/BSTEFA~1/AppData/Local/Temp/tmp5t7vj96aprefect/aurora\\aurora_constants.py'
Blake Stefansen08/25/2022, 7:23 PM
infrastructure blocks, but this is also an issue for me with the
infrastructure when using prefect cloud. If I run a deployment in the cloud using
for filesystem storage and
for infrastructure, our flow runs error out saying
I'm not sure if this is due to our agent being a
ModuleNotFoundError: No module named 's3fs'
agent and our deployment infrastructure being
, or something else. If the
just runs a subprocess on a docker image made by the kubernetes agent, then I would assume it's an issue with the default image used by the kubernetes agent
Philip MacMenamin08/25/2022, 7:36 PM
in a task such that the key would be available to other downstream tasks within that run?
Marc-Antoine Bélanger08/25/2022, 7:36 PM
Edmondo Porcu08/25/2022, 8:38 PM
does not install the packages
pip install -e ".[dev]"
Tuoyi Zhao08/25/2022, 9:01 PM
Kevin Grismore08/25/2022, 9:02 PM
Should I delete and recreate my GCS block?
UserWarning: Block document has schema checksum sha256:28a9d16c2589998d5281b0496b25edd76c1a0b159095d0c7fe5eb5dc5ab62549 which does not match the schema checksum for class 'GCS'. This indicates the schema has changed and this block may not load.
Jared Robbins08/25/2022, 9:08 PM