Deceivious
04/05/2023, 2:09 PMtask concurrency limit tags
are enforced even when the task has already been cached before [i.e. when fetching data off the cache] ?Patrik Fejda
04/05/2023, 2:12 PMimport fastapi
from prefect import flow, task
import time
@task()
def save_to_db(a,b):
time.sleep(100)
print("Saving to db...")
print("Saved to DB")
@flow()
def flow_my_endpoint(a,b):
save_to_db(a,b)
suma = a + b
return {'suma': suma}
@router.post("/csv/url")
async def my_endpoint(a, b):
return flow_my_endpoint(a,b)
John-Craig Borman
04/05/2023, 2:24 PM13:29:44.526 | INFO | prefect.engine - Engine execution of flow run '0f87d3ab-658a-4fb8-bf9d-9daf903bcaf1' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.
13:29:43.035 | INFO | Flow run 'unnatural-leech' - Downloading flow code from storage at None
warn(RuntimeWarning(msg))
/usr/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
prefect version
:
Version: 2.8.7
API version: 0.8.4
Python version: 3.10.10
Git commit: a6d6c6fc
Built: Thu, Mar 23, 2023 3:27 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.31.1
Jens
04/05/2023, 4:19 PMPaco Ibañez
04/05/2023, 4:36 PMOuail Bendidi
04/05/2023, 4:44 PMDockerContainer
infra on a prefect agent. So I'm trying to pass an instance of an object as an argument to my first task, this object happens to be a dataclass but more complex.
Locally it works fine, but on the prefect agent, the engine tries to resolve my object and create a new instance of it before passing it to the task !?
I'm on prefect 2.9.0 and python3.11 (details about the error in the thread ⬇️)John Horn
04/05/2023, 4:48 PMchocolate-cheetah-bwnpz-zlnxj 0/1 Completed 0 3h42m
The meta data in the Prefect Cloud UI in the side bar matches that 3h42m
Flow Run
Start Time
2023/04/05 06:00:05 AM
Duration
3h 44m
Run Count
1
Created
2023/04/05 03:00:09 AM
What would you recommend I do? Will this result in bloat over time where Prefect shows lots of running processes even though they are not?
At the deployment infra level I have in the KubernetesJob
job_watch_timeout_seconds = 3600,
Eric Coleman
04/05/2023, 4:53 PMSlackbot
04/05/2023, 7:39 PMChoenden Kyirong
04/05/2023, 8:13 PM*.sql
file in any prefect_sqlalchemy.SqlAlchemyConnector
methods like fetch_all
?Matt Fysh
04/06/2023, 1:45 AMJSON.load(state/{mykey}
in a try, and then in the except block do JSON(value={initvalue})
+ json_block.save
?Matt Fysh
04/06/2023, 2:41 AMKirill Egorov
04/06/2023, 6:57 AMChoenden Kyirong
04/06/2023, 7:15 AM--> task_b --
| |
task_a - ----> task_d
| |
--> task_c --
If so, will task_d
wait for both task_b
and task_c
to finish before executing? Any guidance on this would be greatly appreciated- Thanks!Vadym Dytyniak
04/06/2023, 8:09 AMEmma Rizzi
04/06/2023, 8:10 AMdata = data[field]
KeyError: 'env'
I reduced the job template to practically nothing custom here in case it was from my additionnal configuration, but still have this error. What am I doing wrong ?Nathan Low
04/06/2023, 12:46 PMDownloading flow code from storage at 'EzeCastle'
08:11:02 AM
DEBUG
Importing flow code from 'ezecastle_daily_p_and_l_loader.py:hourly_loader'
08:11:02 AM
ERROR
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 846, in exec_module
File "<frozen importlib._bootstrap_external>", line 982, in get_code
File "<frozen importlib._bootstrap_external>", line 1039, in get_data
FileNotFoundError: [Errno 2] No such file or directory: 'ezecastle_daily_p_and_l_loader.py'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\PrefectAgent\.venv\lib\site-packages\prefect\engine.py", line 268, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "C:\PrefectAgent\.venv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "C:\PrefectAgent\.venv\lib\site-packages\prefect\deployments.py", line 187, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(load_flow_from_entrypoint, str(import_path))
File "C:\PrefectAgent\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 91, in run_sync_in_worker_thread
return await <http://anyio.to|anyio.to>_thread.run_sync(
File "C:\PrefectAgent\.venv\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "C:\PrefectAgent\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "C:\PrefectAgent\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
result = context.run(func, *args)
File "C:\PrefectAgent\.venv\lib\site-packages\prefect\flows.py", line 786, in load_flow_from_entrypoint
flow = import_object(entrypoint)
File "C:\PrefectAgent\.venv\lib\site-packages\prefect\utilities\importtools.py", line 195, in import_object
module = load_script_as_module(script_path)
File "C:\PrefectAgent\.venv\lib\site-packages\prefect\utilities\importtools.py", line 158, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'ezecastle_daily_p_and_l_loader.py' encountered an exception: FileNotFoundError(2, 'No such file or directory')
Ouail Bendidi
04/06/2023, 4:08 PMtask.map
on a large sequence of inputs (while defining a tag concurrency limit of 20), prefect seems to crash a lot while trying set_state
of tasks with a status code 500 (traceback in the thread ⬇️ )
Do you have any recommandations on how to work around this ? can define retries for prefect internal API ? is it something I'm doing wrong ?Wright Hilsman
04/06/2023, 5:00 PMfrom prefect.context import get_run_context
and then accessing the MetaData from there. Just no deployment name within there sadly.Rudy García
04/06/2023, 9:45 PMprefect-gcp
library when configuring the vpc-connector
. It appears that the library is not properly setting up the call to GCP's REST API.
Here's the error message I received:
HttpError 400 when requesting <https://us-west2-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/ba-basic/jobs?alt=json> returned "metadata.annotations[<http://run.googleapis.com/vpc-access-connector|run.googleapis.com/vpc-access-connector>]: Annotation '<http://run.googleapis.com/vpc-access-connector|run.googleapis.com/vpc-access-connector>' is not supported on resources of kind 'Job'. Supported kinds are: Revision, Execution"
Could anyone please provide some guidance on how to resolve this issue, or point me in the right direction? Any help would be greatly appreciated!Shaoyi Zhang
04/06/2023, 10:36 PMDominic Tarro
04/06/2023, 11:07 PMCrash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1031, in create_block_schema
response = await <http://self._client.post|self._client.post>(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1848, in post
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1533, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 269, in send
response.raise_for_status()
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 132, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '409 Conflict' for url '<https://api.prefect.cloud/api/accounts/e5e970ea-d0a6-4ac5-992d-3796d5308920/workspaces/dbe8a5b3-2746-4ebd-8735-afdcc6550e47/block_schemas/>'
Response: {'detail': 'Data integrity conflict. This usually means a unique or foreign key constraint was violated. See server logs for details.'}
For more information check: <https://httpstatuses.com/409>
The above exception was the direct cause of the following exception:
ObjectAlreadyExists
Gabriel Santos
04/07/2023, 2:07 AMprefect.exceptions.ScriptError: Script at 'test.py' encountered an exception: FileNotFoundError(2, 'No such file or directory')
What am i doing wrong?
My storage is on github!Faheem Khan
04/07/2023, 3:20 AMMohammad Kaif Rizvi
04/07/2023, 5:56 AMZachary Loertscher
04/07/2023, 4:10 PMZach Phillips
04/07/2023, 5:44 PMjcozar
04/07/2023, 5:54 PMwith prefect.context({"date": datetime.datetime(2011, 1, 1)}):
to set context variables and test my flows. Is there something similar in prefect 2.0 to set the start_date
variable from the flow run context (prefect.context.get_run_context()
)?
Thank you!MT
04/07/2023, 10:05 PMMark NS
04/08/2023, 5:24 AM[WARNING] Task run 'bcc56e8e-a1bd-4145-abdb-aad866d63c6f' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
then an error:
[ERROR] prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Previous advice has been that this is caused by an upstream problem, but I can't see anything in the logs (journalctl on ec2 instance) that would cause the enclosing flow to not be in a running state.
Our infra is an prefect agent running on ec2, spawning docker containers on the same box. I checked the monitoring on the box - no cpu/mem spikes.
I've added task retries, but I guess that's not going to fix it either, if the flow is already dead? What else should I look for?