Avinash Santhanagopalan
04/18/2024, 6:34 PMbatch_kwargs: Optiona[Dict[str, Any]]
in the flow and we deployments on that flow. But somehow its showing up as null
in the Custom Run UI and it causes an error when I submit saying None is not of type object
. It only works if I give {}
. Is this expected? Its quite hard to hard code this every time we use the custom run.Charles Leung
04/18/2024, 7:28 PMRobert Banick
04/18/2024, 7:38 PMprefect_aws
I see that the profile_name
property is marked as optional, as so
profile_name: Optional[str] = Field(
default=None, description="The profile to use when creating your session."
)
Is this intended behavior?Nimesh Kumar
04/19/2024, 7:02 AMAlexander Dahl
04/19/2024, 12:10 PMKamal
04/19/2024, 2:48 PMConstantino Schillebeeckx
04/19/2024, 7:07 PMMichael Hoffmann
04/19/2024, 8:06 PMYaron Levi
04/21/2024, 6:38 PMpg_dump
to take a snapshot of a postgres database but having hard time installing pg_dump
We’ve tried this inside the flow’s code:
@flow(name="dump_postgres_to_s3", log_prints=True)
def dump_postgres_to_s3(connection_string: str):
result = subprocess.run(['apt-get', 'install', 'pg_dump'], stdout=subprocess.PIPE)
print(result)
But the result is an error code 100:
CompletedProcess(args=['apt-get', '--fix-broken', 'install', 'pg_dump'], returncode=100, stdout=b'Reading package lists...\nBuilding dependency tree...\nReading state information...\n')
From research, it looks like error code 100 means the is not available to be installed.
So any idea how we can get pg_dump installed and used inside a Flow?
Just to clarify: this runs inside an AWS ECS Task, which uses the default Prefect image <http://docker.io/prefecthq/prefect:2.13.5-python3.10|docker.io/prefecthq/prefect:2.13.5-python3.10>
Thank you for any help on this 🙂Shakib
04/22/2024, 1:35 AMShakib
04/22/2024, 2:20 AMHimanshu
04/22/2024, 7:13 AMEncountered exception during execution:
Traceback (most recent call last):
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/engine.py", line 849, in orchestrate_flow_run
result = await flow_call.aresult()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
return await asyncio.wrap_future(self.future)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/var/www/apt/apt/prefect_flows/workflow.py", line 20, in execute_workflow
return state.result()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/client/schemas/objects.py", line 212, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/states.py", line 71, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
return call()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 398, in _call_
return self.result()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
return self.future.result(timeout=timeout)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/engine.py", line 849, in orchestrate_flow_run
result = await flow_call.aresult()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
return await asyncio.wrap_future(self.future)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/var/www/apt/apt/prefect_flows/workflow.py", line 28, in run_node_flow
res = run_workflow(workflow, tenant, payload_data)
File "/var/www/apt/apt/prefect_flows/workflow.py", line 48, in run_workflow
process_conditional_task(task, tenant, result_of_tasks)
File "/var/www/apt/apt/prefect_flows/workflow.py", line 95, in process_conditional_task
filtered_data = check_condition(task.get("condition"), data_to_filter)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/tasks.py", line 569, in _call_
return enter_task_run_engine(
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/engine.py", line 1382, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
return self.future.result(timeout=timeout)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/engine.py", line 1550, in get_task_call_return_value
return await future._result()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/task_runners.py", line 231, in submit
result = await call()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/engine.py", line 1780, in begin_task_run
state = await orchestrate_task_run(
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/engine.py", line 2023, in orchestrate_task_run
task_run = await client.read_task_run(task_run.id)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2031, in read_task_run
response = await self._client.get(f"/task_runs/{task_run_id}")
File "/home/azureuser/.local/lib/python3.10/site-packages/httpx/_client.py", line 1757, in get
return await self.request(
File "/home/azureuser/.local/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/client/base.py", line 312, in send
response.raise_for_status()
File "/home/azureuser/.local/lib/python3.10/site-packages/prefect/client/base.py", line 164, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc._cause_
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://prefect-server/api/task_runs/><task_id>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
Similarly for other APIs also,
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<server>/api/task_runs/'
Response: {'exception_message': 'Internal Server Error'}
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<server>/api/task_runs/<taskid>/set_state'
Response: {'exception_message': 'Internal Server Error'}
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<server>/api/flows/'
Response: {'exception_message': 'Internal Server Error'}
I am not able to figure out the root cause of this issue.
My infra setup:
- prefect-orion server: 16 GB RAM, 4 Core
work-pools:
- default-pool: same as prefect-orion server | 1 agent
- pool1: server2 16 GB RAM, 4 core | 5 agents
- pool2: same server as pool1 | 7 agents
Pool1 has daily load of nearly 30k flows out of which ~5% failed due to these error.
Pool2 has daily load of nearly 10k flows out of which ~8% is failed due to these error.
Currently there are around 350k total flow runs until know.
All the machines are on same VPC. I am using postgresQL DB connected to my prefect orion server with some table recods (like log, flow_run_state) nearly reaching 700k entries.
Earlier i thought it was due to huge number of data in DB with is causing timeout but i had removed nearly 30% on my old data from DB but the error still exists.
Does anyone have ony idea what might be the cause of it?Jon
04/22/2024, 10:23 AMNick Hoffmann
04/22/2024, 3:16 PM- name: PREFECT_WORKER_PREFETCH_SECONDS
value: "30"
- name: PREFECT_WORKER_QUERY_SECONDS
value: "5"
This works perfectly fine for all auto-generated scheduled flow runs, but when we try to manually kick off a run starting "now" the worker doesn't actually end up making a kubernetes Job for the run. The flow run is then permanently stuck in "Pending" and has no logs, when I would expect at least the worker logs of
Worker 'KubernetesWorker <id>' submitting flow run '<flow run id>'
Creating Kubernetes job...
Completed submission of flow run '<flow run id>'
Job '<job id>': Pod has status 'Pending'.
Manually-triggered flow runs do work if I select start "later" and set 1 minute in the future, rather than "now". any ideas?Steve Gee
04/22/2024, 5:33 PMDerrell
04/23/2024, 12:17 AMMatthew Bell
04/23/2024, 12:22 AMresult
be None
below?
@task
async def test() -> str:
return "I'm an async function"
@flow
async def main() -> None:
future = await test.submit()
state = await future.get_state()
result = state.result()
print(result)
if __name__ == "__main__":
asyncio.run(main())
Alex M
04/23/2024, 2:17 AMgcloud run deploy my-worker \
--image=prefecthq/prefect:2-latest \
--set-env-vars PREFECT_API_URL=<PREFECT_API_URL> \
--service-account <MY_SERVICE_ACCOUNT> \
--no-cpu-throttling \
--min-instances 1 \
--args "prefect","worker","start","--install-policy","always","--with-healthcheck","-p","my-cloud-run-pool","-t","cloud-run" \
--region us-central1 \
--platform managed \
--add-cloudsql-instances <DB_CONNECTION_NAME> \
--update-secrets=DB_CONNECTION_NAME=<SECRET_NAME__VERSION>
however, it seems like both add-cloudsql-instances
and update-secrets
are not picked up when a flow is executed on cloud run. I don't see any of the Cloud Run's environment variable and I get a database connection error.
1. Is there a best practice for accessing CloudSQL from a flow running on Cloud Run?
2. How do I access Cloud Run's variables from a flow?
I tried navigating Prefect's documentation on the topic, but it's rather confusing and seems to be more geared for cloud users. I'm on the OSS version for now.Tim Galvin
04/23/2024, 5:47 AM.map
of a task decorated function. I can easily batch up my large iterable, but I am more curious as to why the .map
might be falling over.flusflas
04/23/2024, 9:22 AMsandeep kumar
04/23/2024, 10:27 AMflow_result = await run_deployment(
name="some_flow_1/dev",
parameters={"body": query}
)
This returns flow run metadata. How do i get the actual return data which some_flow_1 returns ??
I was trying persisted result method, where i mention in task and flow as (persist_result=True) . but it throws permission error
flow_result = dict(flow_result)
result_data = flow_result['state'].result
persisted_result = result_data()
client = get_client()
actual_data = await persisted_result.get(client=client)
<http://logger.info|logger.info>(f"Retrieved Data:{actual_data}")
Error:
actual_data = await persisted_result.get(client=client)
......
.......
......
File "/home/sandeep/.pyenv/versions/3.9.0/lib/python3.9/pathlib.py", line 1221, in stat
return self._accessor.stat(self)
PermissionError: [Errno 13] Permission denied: '/root/.prefect/storage/5198df6f8c0547a5b018a34ea1b9f6f2'
Mohit Singhal
04/23/2024, 11:06 AMarbitrary_types_allowed
in ConfigMohit Singhal
04/23/2024, 11:07 AMMohit Singhal
04/23/2024, 11:10 AMNimesh Kumar
04/24/2024, 8:33 AMimport requests
import json
url = "<https://IP>:port/api/work_queues/"
payload = json.dumps({
"name": "algo-402-4563",
"description": "This work queue is for algorithm 402 and project ID 4563",
"is_paused": False,
"concurrency_limit": 10
})
headers = {
'Authorization': 'Basic bmltZXNoLnBybmltZXNoLnByZWZlY3Q='
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Hi Everyone, i have created work-queue using the above code which is created successfuly, but it is in unhealty state,
is there way to make the queue healthy from the code itself.
The goal is to create a queue only when it requires, so thought process was to create it with an API but it is in unhealthy state.Yaron Levi
04/24/2024, 1:50 PMJohn Mizerany
04/24/2024, 4:14 PMMao Li
04/24/2024, 6:36 PMMao Li
04/24/2024, 11:23 PMMao Li
04/24/2024, 11:24 PM