Austin Weisgrau
03/06/2023, 11:45 PMprefect deploy build
with an S3 upload block is hanging unless I use --skip-upload
$ prefect deployment build flows/helloworld/helloworld_flow.py:helloworld -sb s3/prod -n helloworld --skip-upload
Found flow 'Hello World'
Deployment YAML created at '/home/aradox/code/wfp/wfp-prefect/helloworld-deployment.yaml'.
$ prefect deployment build flows/helloworld/helloworld_flow.py:helloworld -sb s3/prod -n helloworld
Found flow 'Hello World'
Sebastian Gay
03/07/2023, 7:46 AMfrom prefect.infrastructure.docker import DockerContainer
from prefect.filesystems import Azure
from prefect.deployments import Deployment
from prefect import flow
az_block = Azure.load("$AzureContainerBlock")
usecase_container = DockerContainer.load("$DockerBlock")
@flow
def abc_train():
print("abc_train_internal")
docker_deploy = Deployment.build_from_flow(
flow=abc_train,
name="abc_train",
work_queue_name="test",
version="18",
storage=az_block,
path='abc',
infrastructure=usecase_container,
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}}
)
if __name__ == "__main__":
docker_deploy.apply()
When I deploy, the upload to azure blob storage happens as expected.
However, when I run the flow (agent running on local machine) I get the following logs:
---INFO---
Downloading flow code from storage at 'abc'
---ERROR---
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine.py", line 274, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.7/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/deployments.py", line 175, in load_flow_from_flow_run
await storage_block.get_directory(from_path=deployment.path, local_path=".")
File "/usr/local/lib/python3.7/site-packages/prefect/filesystems.py", line 708, in get_directory
from_path=from_path, local_path=local_path
File "/usr/local/lib/python3.7/site-packages/prefect/filesystems.py", line 322, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 293, in get
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 293, in <listcomp>
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
File "/usr/local/lib/python3.7/os.py", line 223, in makedirs
mkdir(name, mode)
FileNotFoundError: [Errno 2] No such file or directory: ''
When going line by line through retrieve_flow_then_begin_flow_run
in an interative python session everything seems to be truly fine, until the storage_block.get_directory()
call. Interactively, deployment.path
is '<az://testing-ib>'
which is the correct container name per the storage block.
Would anyone be able to help me debug this one?Amruth VVKP
03/07/2023, 10:03 AM2023-03-06 20:19:39 value = await result
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 2092, in connect
2023-03-06 20:19:39 return await connect_utils._connect(
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/site-packages/asyncpg/connect_utils.py", line 895, in _connect
2023-03-06 20:19:39 raise last_error
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/site-packages/asyncpg/connect_utils.py", line 881, in _connect
2023-03-06 20:19:39 return await _connect_addr(
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/site-packages/asyncpg/connect_utils.py", line 773, in _connect_addr
2023-03-06 20:19:39 return await __connect_addr(params, timeout, True, *args)
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/site-packages/asyncpg/connect_utils.py", line 825, in __connect_addr
2023-03-06 20:19:39 tr, pr = await compat.wait_for(connector, timeout=timeout)
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/site-packages/asyncpg/compat.py", line 56, in wait_for
2023-03-06 20:19:39 return await asyncio.wait_for(fut, timeout)
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
2023-03-06 20:19:39 return fut.result()
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/site-packages/asyncpg/connect_utils.py", line 684, in _create_ssl_connection
2023-03-06 20:19:39 tr, pr = await loop.create_connection(
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1036, in create_connection
2023-03-06 20:19:39 infos = await self._ensure_resolved(
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1418, in _ensure_resolved
2023-03-06 20:19:39 return await loop.getaddrinfo(host, port, family=family, type=type,
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/asyncio/base_events.py", line 863, in getaddrinfo
2023-03-06 20:19:39 return await self.run_in_executor(
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
2023-03-06 20:19:39 result = self.fn(*self.args, **self.kwargs)
2023-03-06 20:19:39 File "/usr/local/lib/python3.10/socket.py", line 955, in getaddrinfo
2023-03-06 20:19:39 for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
2023-03-06 20:19:39 socket.gaierror: [Errno -2] Name or service not known
2023-03-06 20:19:39
2023-03-06 20:19:39 Application startup failed. Exiting.
Here's my docker-compose -
version: '3.8'
networks:
prefect:
name: prefect
services:
# ------------------------------------------------------------- #
# PostgreSQL DB for Prefect DB #
# ------------------------------------------------------------- #
postgres:
image: postgres:latest
restart: always
command:
- postgres
- -c
- max_connections=150
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
healthcheck:
interval: 10s
retries: 60
start_period: 2s
test: pg_isready -q -d $${POSTGRES_DB} -U $${POSTGRES_USER} | grep "accepting connections" || exit 1
timeout: 2s
ports:
- 5432:5432
expose:
- 5432
# -------------------------------------- #
# Prefect Server #
# -------------------------------------- #
prefect-server:
image: prefecthq/prefect:2.8.4-python3.10
command:
- prefect
- server
- start
ports:
- 4200:4200
expose:
- 4200
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
networks:
- prefect
restart: always
depends_on:
postgres:
condition: service_started
Anish Giri
03/07/2023, 10:44 AMSequentialTaskRunner
for each customer, but ConcurrentTaskRunner
across all customers.Ishan Anilbhai Koradiya
03/07/2023, 10:56 AMJeff Hale
03/07/2023, 12:57 PMAndreas Nord
03/07/2023, 2:03 PMDave Chater
03/07/2023, 4:04 PM10:09:10.168 | ERROR | prefect.agent - An error occured while monitoring flow run '7dde2a66-9429-45c8-b93f-8cbf0ee195cd'. The flow run will not be marked as failed, but an issue may have occurred.
... see thread for full stack ...
request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '1e197494-22ca-42a4-a2ae-6a576f50024c', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '56847d98-65de-47d9-86ba-67cef57766df', 'X-Kubernetes-Pf-Prioritylevel-Uid': '53f2ea6d-3bf3-4231-a47a-e164ef55cc2c', 'Date': 'Mon, 06 Mar 2023 10:09:10 GMT', 'Content-Length': '179'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"strconv.ParseInt: parsing \\"31.160724690009374\\": invalid syntax","reason":"BadRequest","code":400}\n'
Suspect it may have been introduced by this change and it is calculating the remaining_time as a float and not an int
https://github.com/PrefectHQ/prefect/commit/4766c2eb764a7317d81eb32ccfa457ebe1b199c0Vicente Bobadilla Riquelme
03/07/2023, 4:25 PMkasteph
03/07/2023, 6:26 PMetl-server-1 | raise mapped_exc(message) from exc
etl-server-1 | httpx.ConnectError: All connection attempts failed
etl-server-1 |
It does disappear after the prefect server starts up and there's no warning on the Prefect UI about the API being unreachable. I have set the following for the server container too:
PREFECT_API_URL = "<http://0.0.0.0:4200/api>"
PREFECT_UI_API_URL = "<http://0.0.0.0:4200/api>"
I'm just wondering if that's an actual warning or Prefect trying to connect to the API before the server even starts?Ofir
03/07/2023, 7:36 PMjack
03/07/2023, 9:15 PMDeployment.build_from_flow()
how do we specify a work pool? The docstring for Deployment
shows a work_queue
keyword argument but not one for work_pool
.
The docs for work queues say that work queues are an advanced topic. Are we to used work queues or work pools to get certain deployments to be run by certain agents?
Using prefect 2.8.4kasteph
03/07/2023, 10:31 PMFileNotFoundError: [Errno 2] No such file or directory: '/etl/deployments'
I have a Local Storage block defined:
block_storage = LocalFileSystem(basepath=str(Path().parent.resolve()))
block_storage.save("local-storage", overwrite=True)
And that itself is being used by the deployment:
if __name__ == "__main__":
deployment = Deployment.build_from_flow(
flow=trigger_that_job,
name="trigger_that_job",
work_queue_name="work",
storage=LocalFileSystem.load("local-storage")
)
deployment.apply()
Doing docker exec compose server sh
and looking around, the directory and path are there.
This is the folder structure:
.
├── blocks/
├── deployments/
├── docker-compose.yml
John Horn
03/07/2023, 10:45 PM@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=1),
persist_result=True,
result_storage=GCS(
bucket_path="foo-bucket-unique",
service_account_info=json.dumps(GcpCredentials.load('foo-gcp-service-account').service_account_info))
)
error:
Path /root/.prefect/storage/XXXXXXXXXXXXXXXXXXXXXXX does not exist.
This will persist to the bucket but will still run both tasks w/o any utilization of the cache on either run
@task(
persist_result=True,
result_storage=GCS(
bucket_path="foo-bucket-unique",
service_account_info=json.dumps(GcpCredentials.load('foo-gcp-service-account').service_account_info))
)
This will persist to the bucket but will still run both tasks w/o any utilization of the cache on either run
@task(
# cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=1),
persist_result=True,
result_storage=GCS(
bucket_path="foo-bucket-unique",
service_account_info=json.dumps(GcpCredentials.load('foo-gcp-service-account').service_account_info))
)
Russell Brooks
03/08/2023, 7:26 AMSamuel Hinton
03/08/2023, 7:50 AMyyyy-mm-dd
as the objectively best datetime formatSamuel Hinton
03/08/2023, 8:52 AMDeceivious
03/08/2023, 9:33 AMRunning
. Ive written a flow [code below] that
1. runs every minute on a single concurrency queue
2. has a task that retries 5 times with 1 min of retry delay.
Any new flow runs is immediately set to Late
state. This behavior kinda makes sense. But I would like the task retries to be non blocking on flow level.
I am guessing the easiest way of doing this would be to move the retries from task level to flow level and cache and persist the task result? But this would remove individual retry attempts counter on individual task level.
Looking for community's view on this approach. 👀Samuel Hinton
03/08/2023, 10:36 AM@wraps(fn)
annotation. Full reproduction of the issue below, but TLDR if you keep the @wraps you get an exception of TypeError: add() takes 2 positional arguments but 3 were given
. If you remove the @wraps, the flow runs as expected. Happy to raise this on GitHub, just again trying to validate here before adding more issues to triage
from prefect import flow, get_run_logger, task
from functools import wraps
def decorator(fn):
@wraps(fn) # With wraps, the flow does not work. Comment this line out and it will work again.
def wrapper(a, b, **kwargs):
return fn(a, b, **kwargs)
return wrapper
@task
@decorator
def add(a, b, logger=None):
result = a + b
if logger is not None:
<http://logger.info|logger.info>(f"Adding {a} and {b} to get {result}")
return result
@flow
def kwarg_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting an ECS task")
a, b = 1, 2
add.submit(a, b, logger=logger)
<http://logger.info|logger.info>("Everything is done!")
if __name__ == "__main__":
from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect.settings import PREFECT_API_URL, temporary_settings
with temporary_settings({PREFECT_API_URL: "<http://orion.orchestrator-ds-dat3.arenko:4200/api>"}):
s3_block = S3.load("flow-storage")
print(f"Deploying flow")
d = Deployment.build_from_flow(
flow=kwarg_flow,
name="default",
storage=s3_block,
work_pool_name="dask",
load_existing=False,
skip_upload=False,
apply=True,
path="/",
)
Federico Zambelli
03/08/2023, 10:44 AMVasco Leitão
03/08/2023, 11:23 AMSubmission failed. KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
, though I have two ECS blocks registered (one with VPC definition, and other without them). I tried the solution described in this thread - installing prefect-aws
in the agent, but to no avail. Does someone has other possible solutions to this issue? I'm adding the ECS task definition, Prefect versions, in the thread. Thank you!Joshua Greenhalgh
03/08/2023, 11:44 AMTolga Karahan
03/08/2023, 1:48 PMChecking flow run state…
many times. Then it is starting executing first a few steps in the flow again, and finally logs this and get stuck in running state for days: Flow run RUNNING: terminal tasks are incomplete.
When I checked I also saw that pod of the flow is gone. Any ideas? Thanks.Steven Wilber
03/08/2023, 4:52 PMIgor Kaluder
03/08/2023, 5:07 PMkiran
03/08/2023, 6:43 PMpsycopg2
, I use a try/finally
because leaving the context manager doesn’t close the connection. So, I would do things like this:
sql = ...
conn = ...
try:
with conn:
with conn.cursor() as curs:
curs.execute(sql)
...
finally:
conn.close()
Since psycopg2
specifically notes that the connection doesn’t get closed, I would like to confirm that the prefect-sqlalchemy SQLAlchemyConnector
does close the connection automatically (as your docs say) when using the context manager like below and therefore, I don’t need to use the try/finally
. I’m using the postgresql+psycopg2
driver. Thanks!
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = database.fetch_many("SELECT * FROM customers", size=2)
print(results)
results = database.fetch_many("SELECT * FROM customers", size=2)
print(results)
Prasanth Kothuri
03/08/2023, 6:45 PMfrom myint.mal.platemap.processing.config import PlateMapConfig
@task(name='download_from_db')
def download_from_db():
# variables
config_path = "job_configs/processing.json"
env = os.getenv("env")
config = PlateMapConfig.read_from_json(env, config_path)
Error
File "platemap_flow.py", line 26, in download_from_db
config = PlateMapConfig.read_from_json(env, config_path)
NameError: name 'PlateMapConfig' is not defined
of course the package is in the PYTHONPATH both in the docker image and also add to env of docker block and running the docker image with
docker run -it {image} /bin/bash
and importing that package works.. kinda of lost why this is not workingJosh Paulin
03/08/2023, 7:46 PMkasteph
03/08/2023, 10:37 PMprefect block register -f blocks/gcp.py
for f in deployments/*.py; do
python "$f";
done
Albert Wong
03/09/2023, 4:13 AM