Thomas Pedersen
03/09/2023, 6:33 AMWITH RECURSIVE typeinfo_tree ...
was by far the heaviest. Several places online this was reported to be related to JIT (e.g. https://github.com/MagicStack/asyncpg/issues/530) so I attempted to disable JIT (-c jit=false
when starting the PostgreSQL docker container) ... and then all the peak loads disappeared from our server.
(Prefect 2.7.12)Henning Holgersen
03/09/2023, 9:06 AMpython:3.9
image, and we have tried to keep them lightweight by just pip-installing prefect. This has worked nicely enough with Kubernetes, although it has never worked when using Azure ACI.
I added a post on discourse, about building more sturdy but still lightweight images. I imagine this is our underlying issue, and some recipes might be valuable for others as well.
https://discourse.prefect.io/t/minimal-prefect-dockerfile-from-a-base-image/2502Tim Wisniewski
03/09/2023, 9:31 AM@task
, but I wrote the models using SQLAlchemy v2, and it looks like Prefect depends on v1.4. Is it not possible to use the latest SQLAlchemy version for my models with Prefect?Andreea Taylor
03/09/2023, 10:34 AMTibs
03/09/2023, 1:11 PMMuhammad Husnain
03/09/2023, 1:44 PMprefect logger
inside a task runner
?
I'm using the RayTaskRunner
and I've defined a custom class using the following
class AnyscaleTaskRunner(RayTaskRunner):
def __init__(self, **init_kwargs: Any):
some code
with Prefect flows and I have some information that is only available within the AnyscaleTaskRunner
class.
Is there any way that I can access the prefect context or the prefect logger in the AnyscaleTaskRunner
class that I created?Andreas Nord
03/09/2023, 2:12 PMHaotian Li
03/09/2023, 2:28 PMBlake Stefansen
03/09/2023, 4:04 PMprefect deployment apply ... --upload
)
If a new file in our codebase doesn't exist in s3, that file will get uploaded. ā
If a file in our codebase gets deleted, I would like it to also delete in s3. ā
Is this possible?Tim Wisniewski
03/09/2023, 4:28 PMTomƔs Emilio Silva Ebensperger
03/09/2023, 7:51 PMXavier Babu
03/09/2023, 8:39 PMZack
03/09/2023, 10:53 PMdef handler(event: dict, context: LambdaContext):
"""This function is the entry point for the Lambda function"""
<http://log.info|log.info>("starting lambda function for artist ranking")
<http://log.info|log.info>(f"event: {event},context: {context}")
start = time.time()
deployment = Deployment.build_from_flow(
flow=artist_ranker_etl,
name="artist-ranker-deployment"
)
deployment.apply()
run_deployment(name="artist_ranker_staging/artist-ranker-deployment",tags=["staging","ranker"])
end = time.time()
print(f"Time to execute: {end - start}")
Zach Jablons
03/09/2023, 11:22 PMZach Jablons
03/10/2023, 1:33 AMAleksandr Liadov
03/10/2023, 1:08 PMsocket.gaierror: [Errno -3] Temporary failure in name resolution
<https://api.prefect.cloud/api/accounts/my_account/workspaces/my_workspace/task_runs/0b3215a1-c402-4116-8320-439ca875a59a/set_state> failed
I use last version of prefect and my runner is RayTaskRunner.
Infrastructure block is k8s.
Any idea?
Stacktrace in threads!
``````Gintautas Jankus
03/10/2023, 1:22 PMYaron Levi
03/10/2023, 2:21 PMMichael Hadorn
03/10/2023, 2:55 PMNimesh Kumar
03/10/2023, 5:45 PMCan't connect to Orion API at <http://0.0.0.0:4200/api>.
Check that it's accessible from your machine.
This is my docker-compose which i am using to create the required prefect services.
version: "3.9"
services:
database:
image: postgres:15.2-alpine
container_name: database
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=orion
expose:
- 5432
volumes:
- db:/var/lib/postgresql/data
networks:
- backend
minio:
image: minio/minio:latest
container_name: minio
entrypoint: ["minio", "server", "--address", "0.0.0.0:9000", "--console-address", "0.0.0.0:9001", "/data"]
volumes:
- "minio:/data"
ports:
- 9000:9000
- 9001:9001
networks:
- backend
orion:
image: prefecthq/prefect:2.8.2-python3.11
container_name: orion
restart: always
volumes:
- prefect:/root/.prefect
entrypoint: ["prefect", "orion", "start"]
working_dir: "/root/flows"
volumes:
- "./flows:/root/flows"
- "./flows/tmp:/tmp"
- "./flows/tmp_folder:/tmp_folder"
working_dir: "/root/flows"
environment:
- PREFECT_UI_URL=<http://127.0.0.1:4200/api>
- PREFECT_ORION_API_HOST=0.0.0.0
- PREFECT_ORION_DATABASE_CONNECTION_URL=<postgresql+asyncpg://postgres:postgres@database:5432/orion>
ports:
- 4200:4200
depends_on:
- database
networks:
- backend
agent:
image: prefecthq/prefect:2.8.2-python3.11
container_name: agent
restart: always
entrypoint: ["prefect", "agent", "start", "-q", "queue_1", "-q", "queue_2", "-q", "queue_3"]
volumes:
- "./flows:/root/flows"
- "./flows/tmp:/tmp"
- "./flows/tmp_folder:/tmp_folder"
environment:
- PREFECT_API_URL=<http://127.0.0.1:4200/api>
networks:
- backend
volumes:
prefect:
db:
minio:
networks:
backend:
driver: "bridge"
Can anyone please check where i am doing wrong ...
ThanksMansour Zayer
03/10/2023, 6:50 PMFlow
instead of @flow
):
def create_flow(flow_name, param):
def general_func(param):
do_something(param)
return Flow(general_func, name=flow_name)
my_flow_1 = create_flow(flow_name="my_flow_1", param=ex_1)
my_flow_2 = create_flow(flow_name="my_flow_2", param=ex_2)
Now this works fine locally. But when I deploy to the Prefect cloud, I get this error when I run the flow (deployment is fine):
raise MissingFlowError(prefect.exceptions.MissingFlowError: Flow function with name 'general_func' not found in {my directory}.
Why is it expecting a flow called general_func
? How can I fix this?
p.s. I don't want my general_func to be a subflow.
ThanksAlbert Wong
03/10/2023, 8:14 PMFile "/home/ubuntu/apps/dataflows/venv/lib/python3.11/site-packages/prefect/infrastructure/docker.py", line 95, in _get_docker_client
raise RuntimeError(f"Could not connect to Docker.") from exc
RuntimeError: Could not connect to Docker.
Charles Leung
03/10/2023, 10:56 PMNotImplementedError
when using the prefect_shell collection.
It was working previously but for some reason when I tried doing it again itās not working. Iāve already used pip install -U prefect-shell and double checked that its installed correctly in the venv Lib
But when running a simple test I get the following āNot Implementedā error:
The source code can be found below:
from prefect import flow
from prefect_shell import shell_run_command
@flow
def example_shell_run_command_flow():
return shell_run_command(command="ls .", return_all=True)
if name == "main":
print("test")
example_shell_run_command_flow()
error message from the cloud UI is attached.
Error_msgā¾
1569Ć931 120 KBā¾
Jeff Wiens
03/10/2023, 11:20 PMDaniel Alejandro Mesejo-León
03/11/2023, 6:46 PMpytest tests
I get:
ImportError while loading conftest '/Users/daniel.mesejo/PycharmProjects/prefect/tests/conftest.py'.
tests/conftest.py:67: in <module>
from prefect.testing.cli import *
src/prefect/testing/cli.py:7: in <module>
from prefect.cli import app
src/prefect/cli/__init__.py:9: in <module>
import prefect.cli.cloud
src/prefect/cli/cloud.py:58: in <module>
login_api = FastAPI(on_startup=[set_login_api_ready_event])
../../opt/anaconda3/envs/prefect/lib/python3.8/site-packages/fastapi/applications.py:124: in __init__
self.router: routing.APIRouter = routing.APIRouter(
../../opt/anaconda3/envs/prefect/lib/python3.8/site-packages/fastapi/routing.py:502: in __init__
super().__init__(
../../opt/anaconda3/envs/prefect/lib/python3.8/site-packages/starlette/routing.py:592: in __init__
warnings.warn(
E DeprecationWarning: The on_startup and on_shutdown parameters are deprecated, and they will be removed on version 1.0. Use the lifespan parameter instead. See more about it on <https://www.starlette.io/lifespan/>.
This is because the latest version (0.94.0) of FastAPI uses the version 0.26.0.post1 of starlette, that it turn introduces the DeprecationWarning
. The last update for FastAPI was done yesterday March 10th (see here).
In the short term downgrading to 0.93.0 (pip install fastapi==0.93.0
) solves the issue, but perhaps for the future is worth creating a ticket to start using the lifespan parameter. What would be the best course of action here? In any case Iāll be happy to help š.Ofir
03/12/2023, 4:52 PMSamuel Hinton
03/13/2023, 1:17 AMtry:
data = task.submit(start, end, logger=logger)
except DataWarning as e:
logger.warning(e)
send_incoming_webhook_message(
slack_webhook,
f"Warning raised for {task_name} for {start} to {end} due to {e}",
)
But no dice, doesnt play well with the future object. Im reworking all of this to not use exceptions and just pass dictionaries around, but thought knowing the best practise might be usefulJonathan Langlois
03/13/2023, 2:08 AMRaviraj Dixit
03/13/2023, 4:31 AMkubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'a78a7ce9-1b45-439f-824f-b5470e93dea2', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Sat, 11 Mar 2023 18:22:02 GMT', 'Content-Length': '179'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"strconv.ParseInt: parsing \\"59.911360159516335\\": invalid syntax","reason":"BadRequest","code":400}\n'
Faizul
03/13/2023, 5:50 AM