Vincenzo
03/14/2023, 4:36 PMVincenzo
03/19/2023, 3:05 PMprefect agent start -q 'default'
2. I call my flow python my_flow_file.py
I would like my flow now to run independently without me needing to do (1) and (2). I scheduled my flow to run every Saturday, but when I checked, today, it was in status late
and was waiting for a agent to pick up the run. How would I start an agent when I am not in VS code, starting it via the CLI? I assumed it would be started via the Prefect Cloud or my VM that was running.Vincenzo
03/21/2023, 9:05 AMJohn Kang
04/07/2023, 2:45 PMBebeto Nyamwamu
04/11/2023, 6:37 AMFlow could not be retrieved from deployment.
Traceback (most recent call last):
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/deployments.py", line 159, in load_flow_from_flow_run
await storage_block.get_directory(from_path=deployment.path, local_path=".")
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/filesystems.py", line 553, in get_directory
return await self.filesystem.get_directory(
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/prefect/filesystems.py", line 310, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 113, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 98, in sync
raise return_result
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 561, in _get
return await _run_coros_in_chunks(
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/fsspec/asyn.py", line 269, in _run_coros_in_chunks
await asyncio.gather(*chunk, return_exceptions=return_exceptions),
File "/usr/lib/python3.10/asyncio/tasks.py", line 408, in wait_for
return await fut
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/gcsfs/core.py", line 1266, in _get_file
await self._get_file_request(u2, lpath, callback=callback, **kwargs)
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/decorator.py", line 221, in fun
return await caller(func, *(extras + args), **kw)
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/gcsfs/retry.py", line 115, in retry_request
return await func(*args, **kwargs)
File "/home/datasc/tags-scripts/venv/lib/python3.10/site-packages/gcsfs/core.py", line 1253, in _get_file_request
f2.write(data)
OSError: [Errno 28] No space left on device
Its deployed and running on GCPAaron
04/11/2023, 2:15 PMsundeep
04/29/2023, 4:46 PMFileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows'
Which is surprising because I don't use the above location to reference the flow. This is what my Docker file look like
FROM python:3.8-slim-buster
ARG PREFECT_API_KEY
ENV PREFECT_API_KEY=$PREFECT_API_KEY
ARG PREFECT_API_URL
ENV PREFECT_API_URL=$PREFECT_API_URL
ARG GCP_DATASET_NAME
ENV GCP_DATASET_NAME=$GCP_DATASET_NAME
ARG GCP_DATASET_TABLE_NAME
ENV GCP_DATASET_TABLE_NAME=$GCP_DATASET_TABLE_NAME
ARG GCP_PROJECT_ID
ENV GCP_PROJECT_ID=$GCP_PROJECT_ID
ARG GCP_REGION
ENV GCP_REGION=$GCP_REGION
COPY poetry.lock .
COPY pyproject.toml .
RUN pip install poetry --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir
RUN poetry config virtualenvs.create false
RUN poetry install --no-root --without dev
RUN mkdir scripts
copy scripts/ scripts
RUN mkdir config
COPY config/ config
RUN mkdir -p dbt/xetra
COPY dbt/xetra dbt/xetra
Any idea why Prefect is looking for the flow in the /opt/prefect/flows directory?
I am running this via the CloudJohn Kang
05/10/2023, 1:11 PMflapili
05/10/2023, 2:19 PMjuandavidlozano
05/10/2023, 11:15 PMupload_from_path
on my code you will see that I am passing the same variable path as the from_path
and the to_path
but for some reason prefect changes the structure of the to_path
variable, here is the code I have that builds the path:
@task()
def write_local(df: pd.DataFrame, color: str, dataset_file: str) -> Path:
"""Write DataFrame out locally as parquet file"""
Path(f"data/{color}").mkdir(parents=True, exist_ok=True)
path = Path(f"data/{color}/{dataset_file}.parquet")
df.to_parquet(path, compression="gzip")
return path
@task
def write_gcs(path: Path) -> None:
"""Upload local parquet file to GCS"""
gcs_block = GcsBucket.load("zoom-gcs")
gcs_block.upload_from_path(from_path=path, to_path=path)
return
you can see in the second task write_gcs
both of the paths are the same variable called path
and that is just a path structure that has originally this value: 'data/yellow/yellow_tripdata_2021-01.parquet'
.
The prefect flows runs, but after it runs, in the details of the flow we can see on the second picture I am attaching it changed the text structure of the path for GCS to: 'data\\yellow\\yellow_tripdata_2021-01.parquet'
, no idea why this is happening and because of this you can see in the picture 1 that it saves the file with that weird name instead of creating the folders in GCS, any help on maybe why this is happening?Matthieu Lhonneux
05/11/2023, 12:35 PMDevin
05/30/2023, 2:37 PMAustin Weisgrau
06/01/2023, 5:42 PMChandan Maruthi
06/05/2023, 8:45 AMAdrian Brudaru
06/22/2023, 3:58 PMKohjunwei J
06/27/2023, 5:21 AMDan Cabrol
06/28/2023, 4:18 PMBebeto Nyamwamu
07/06/2023, 10:22 AMprefect.yaml
configurations and work pool and queue arrangement? Please share the steps and details on this.GauravTalele
07/14/2023, 8:04 AMowolabi akintan
08/04/2023, 11:24 PMowolabi akintan
08/07/2023, 10:52 PMeran
08/22/2023, 10:59 AMKacper Kwasnioch
09/19/2023, 1:32 PMDerek
09/25/2023, 7:31 AMSunny Shah
10/17/2023, 5:06 AMTuran
01/09/2024, 8:29 AMJeremy DiBattista
01/12/2024, 4:24 PMCormac
03/04/2024, 2:54 PMquery = f"SELECT * FROM `{table}`"
print(f"Starting DB fetch")
with conn_obj.get_engine().connect() as connection:
print(f"Got DB connection")
with connection.execution_options(yield_per=yield_per).execute(query) as result:
print("Ran DB Query")
print(f"sending block partition")
time.sleep(1)
for block in result:
p_in.send(block)
...
fails with error
AttributeError: __enter__
This type of error suggests that context manager is not supported - but it it is if SQLAlchemy is version 2.
But my pip freeze
reports prefect-sqlalchemy==0.3.2
Can someone please assist?
1. What version of SQLAlchemy is prefect-sqlalchemy built over?
2. How do I do server-side cursors with Prefect SQLAlchemy?Gemma
03/11/2024, 7:28 PMPREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS
on the worker to be 30 minutes (basically I just need the notification if it's REALLY late), however, we are getting notified after a couple of seconds of the run attempting to start. What am I missing?Matthew Bell
03/12/2024, 2:45 AMENV PIPENV_VENV_IN_PROJECT 1