eli yosef
05/04/2023, 9:32 AMdarshan darshan
05/04/2023, 10:37 AMAditya N
05/04/2023, 11:25 AMOuail Bendidi
05/04/2023, 12:36 PMAdrian Mihalache
05/04/2023, 1:12 PMJohn-Craig Borman
05/04/2023, 1:58 PMAttributeError: 'State' object has no attribute 'services'
Matt Kizaric
05/04/2023, 3:32 PMDekel R
05/04/2023, 3:49 PMcurl --location --request POST '<https://api.prefect.cloud/api/accounts/0aceff1b-4fdb-4469-adde-197148a98672/workspaces/b7474c31-4033-449b-96b6-f3544e6f9408/flow_runs/>' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer api_key' \
--header 'X-PREFECT-API-VERSION: 0.8.4' \
--data-raw '{
"name": "testing_api_1",
"flow_id": "46a211a5-a943-4547-8b4d-8e3c09da68d2",
"deployment_id": "8507e60e-0a93-43c0-81ba-bd050f0ecd53", "infrastructure_document_id": "5ed25162-53fb-4df5-89bc-426afdc83f41", "state": {"type": "SCHEDULED"}
}'
I looked here https://app.prefect.cloud/api/docs#tag/Flow-Runs - create flow run - but I don’t see any queue and pool params.
Thanksmerlin
05/04/2023, 4:16 PMprefect_test_harness
fixture:
How to unit test with the fixture, when flows depend on blocks?
Do I have to register all my blocks for the tests? For secret key blocks this is very inconvenient.Shane Satterfield
05/04/2023, 4:36 PMTraceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/services.py", line 120, in _run
async with self._lifespan():
File "/usr/local/lib/python3.10/contextlib.py", line 199, in __aenter__
return await anext(self.gen)
File "/usr/local/lib/python3.10/site-packages/prefect/events/worker.py", line 30, in _lifespan
async with self._client:
File "/usr/local/lib/python3.10/site-packages/prefect/events/clients.py", line 118, in __aenter__
await self._reconnect()
File "/usr/local/lib/python3.10/site-packages/prefect/events/clients.py", line 136, in _reconnect
self._websocket = await self._connect.__aenter__()
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/client.py", line 637, in __aenter__
return await self
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/client.py", line 655, in __await_impl_timeout__
return await self.__await_impl__()
File "/usr/local/lib/python3.10/site-packages/websockets/legacy/client.py", line 659, in __await_impl__
_transport, _protocol = await self._create_connection()
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1036, in create_connection
infos = await self._ensure_resolved(
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1418, in _ensure_resolved
return await loop.getaddrinfo(host, port, family=family, type=type,
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 863, in getaddrinfo
return await self.run_in_executor(
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 821, in run_in_executor
executor.submit(func, *args), loop=self)
File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
Not sure where this error is coming from, but I would expect any error that terminates the Prefect agent should trigger a pod restart. I checked the Helm chart and I don't see a liveness probe there. Is this something that the Prefect team can add? I assume the blocker is that the Prefect agent may not have a good way to check that it's still alive.Ivan Yakovlev
05/04/2023, 6:27 PMgperrone
05/04/2023, 7:01 PMPaco Ibañez
05/04/2023, 7:43 PMMichael Duncan
05/04/2023, 8:25 PMsjammula
05/04/2023, 8:43 PMMatthew Dickinson
05/04/2023, 8:47 PMMatthew Dickinson
05/04/2023, 8:48 PMXavier Witdouck
05/04/2023, 8:53 PMRobert Banick
05/04/2023, 11:13 PMon_crashed
parameter just released (!). Is is possible to pass in additional arguments to function so invoked, using objects generated during the course of a flow? Like so:
def cleanup(path_name: str):
shutil.rmtree(path_name)
def crash_hook(path_name, flow, flow_run, state):
cleanup(path_name)
@flow(on_crashed=[crash_hook])
def my_flow():
# stuff
path_name = other_function()
My use case is wiping very large amounts of data downloaded for temporary processing as a part of my ETLs. Leaving this lying around on temporary storage is expensive and best avoided. The path name for the directory to wipe depends on the ETL so invoked and so a static function won’t cut it; I need to vary the path I apply shutil.rmtree
to.
Thanks for pushing this awesome feature, it directly responds to a problem I was just about to address. Very cool!John Horn
05/05/2023, 1:14 AMAbhinav Chordia
05/05/2023, 4:30 AMraise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect.mls.gh.st:4200/api/deployments/762eab44-76ca-44ee-8334-f2a5d46d4883>'
Response: {'detail': 'Deployment not found'}
For more information check: <https://httpstatuses.com/404>
An exception occurred.
Mark NS
05/05/2023, 6:49 AMRecursionError: maximum recursion depth exceeded in comparison
very similar to this (unanswered) report from @Austin Weisgrau a few days ago. Full stack trace in thread. Could someone from the Prefect team take a look? (also, we're paying customers, not sure if that entitles us to raise support tickets beyond here and/or github somehow?)Malavika S Menon
05/05/2023, 10:54 AMTim-Oliver
05/05/2023, 12:05 PMA
which produces a result which I don't want to persist but just keep in memory. This result is consumed by multiple invocations of task B
. Once all invocations of task B
are done I don't need the result of task A
anymore and would like to remove it from the cache to free up some memory. How could I achieve this?
Thanks!Oscar Björhn
05/05/2023, 12:33 PMJean-Michel Provencher
05/05/2023, 12:44 PMTuomas Heiskanen
05/05/2023, 1:10 PMDeceivious
05/05/2023, 3:35 PMfrom prefect.packaging.docker import DockerPackager
docker = DockerPackager(
dockerfile=str(Path("Dockerfile").resolve()),
image_flow_location=flow_path,
registry_url=config.get("AZURE_REGISTRY", "DOCKER_REGISTRY_URL"),
)
docker_manifest = await docker.package(flow=flow)
to build and push images and use the manifest on kubernetes infra_overrides
.
However the package
method is now raising issue.
File "/home/dcvs/source_code/archimedes_etl_flows/.venv/lib/python3.10/site-packages/docker/api/client.py", line 221, in _retrieve_server_version
raise DockerException(
docker.errors.DockerException: Error while fetching server API version: HTTPConnection.request() got an unexpected keyword argument 'chunked'
Deceivious
05/05/2023, 4:46 PMAbhinav Chordia
05/05/2023, 4:47 PMraise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect.mls.gh.st:4200/api/deployments/762eab44-76ca-44ee-8334-f2a5d46d4883>'
Response: {'detail': 'Deployment not found'}
For more information check: <https://httpstatuses.com/404>
An exception occurred.
That deployment doesn’t actually exist. Do I need to do some cleaning in the database?