Hey team, can you help me getting around this issue I'm having in the past few weeks on a Prefect Ag...
e

Emil Ordoñez

over 2 years ago
Hey team, can you help me getting around this issue I'm having in the past few weeks on a Prefect Agent constantly stopping after this error:
Failed the last 3 attempts. Please check your environment and configuration.
Examples of recent errors:
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 224, in "
process_input
"    func, target_state = self._transitions[(self.state, input_)]"
"KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)"
"During handling of the above exception, another exception occurred:"
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"116, in handle_async_request"
"    await self._send_request_headers(request=request, stream_id=stream_id)"
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"213, in _send_request_headers"
"    self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)"
"  File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 766, in "
send_headers
    self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
"  File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 228, in "
process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in 
state ConnectionState.CLOSED
"During handling of the above exception, another exception occurred:"
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 60, in map_httpcore_exceptions"
    yield
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 353, in handle_async_request"
    resp = await self._pool.handle_async_request(req)
  File 
"""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py"", "
"line 253, in handle_async_request"
    raise exc
  File 
"""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py"", "
"line 237, in handle_async_request"
    response = await connection.handle_async_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py"", "
"line 90, in handle_async_request"
    return await self._connection.handle_async_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"152, in handle_async_request"
    raise LocalProtocolError(exc)  # pragma: nocover
httpcore.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in 
state ConnectionState.CLOSED
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/services.py"", "
"line 46, in critical_service_loop"
    await workload()
"  File ""/usr/local/lib/python3.10/site-packages/prefect/agent.py"", line 261, in "
check_for_cancelled_flow_runs
    async for work_queue in self.get_work_queues():
"  File ""/usr/local/lib/python3.10/site-packages/prefect/agent.py"", line 144, in "
get_work_queues
    work_queue = await self.client.read_work_queue_by_name(
  File 
"""/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py"", line "
"850, in read_work_queue_by_name"
"    response = await self._client.get(f""/work_queues/name/{name}"")"
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1754, in"
get
    return await self.request(
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1530, in"
request
"    return await self.send(request, auth=auth, "
follow_redirects=follow_redirects)
"  File ""/usr/local/lib/python3.10/site-packages/prefect/client/base.py"", line "
"251, in send"
    response = await self._send_with_retry(
"  File ""/usr/local/lib/python3.10/site-packages/prefect/client/base.py"", line "
"194, in _send_with_retry"
    response = await request()
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1617, in"
send
    response = await self._send_handling_auth(
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1645, in"
_send_handling_auth
    response = await self._send_handling_redirects(
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1682, in"
_send_handling_redirects
    response = await self._send_single_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1719, in"
_send_single_request
    response = await transport.handle_async_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 352, in handle_async_request"
    with map_httpcore_exceptions():
"  File ""/usr/local/lib/python3.10/contextlib.py"", line 153, in __exit__"
"    self.gen.throw(typ, value, traceback)"
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 77, in map_httpcore_exceptions"
    raise mapped_exc(message) from exc
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state 
ConnectionState.CLOSED
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py"", line 41, in wrapper"
"    return fn(*args, **kwargs)"
"  File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py"", line 260, in coroutine_wrapper"
    return call()
"  File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 245, in __call__"
    return self.result()
"  File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 173, in result"
    return self.future.result(timeout=timeout)
"  File ""/usr/local/lib/python3.10/concurrent/futures/_base.py"", line 451, in result"
    return self.__get_result()
"  File ""/usr/local/lib/python3.10/concurrent/futures/_base.py"", line 403, in __get_result"
    raise self._exception
"  File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 218, in _run_async"
    result = await coro
"  File ""/usr/local/lib/python3.10/site-packages/prefect/cli/agent.py"", line 189, in start"
    async with anyio.create_task_group() as tg:
"  File ""/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py"", line 662, in __aexit__"
    raise exceptions[0]
"  File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/services.py"", line 104, in critical_service_loop"
"    raise RuntimeError(""Service exceeded error threshold."")"
RuntimeError: Service exceeded error threshold.
An exception occurred.
I'm running the Agent as an ECS Service on Fargate, so it's always starting again but with unexpected random behavior, sometimes the flows keep frozen after this issue. I think this is happening after I updated to the 2.10.6 version of prefect. Before the update, the agent was not showing that Stopping behavior. Has anyone experienced something similar? I see in this thread some kind of similar error, but I'm not sure if doing the
prefect config set PREFECT_API_ENABLE_HTTP2=False
thing didn't actually solve that issue and I'm also not sure how to specify that on the ECS Task Definition? Should I add this config as an Environment Variable?
Hello everyone, I'm facing an issue using Prefect with a Dask task runner, although the issue may b...
r

Raphaël Robidas

11 months ago
Hello everyone, I'm facing an issue using Prefect with a Dask task runner, although the issue may be more linked to how Prefect works and how I use it. See the minimal example of what I would like to do in the thread. I created the "AsCompleted" class based on
<http://prefect.futures.as|prefect.futures.as>_completed
in order to be able to add new futures at any time (it works great as far as I can tell). This code works with the `ThreadPoolTaskRunner`:
Starting task with argument: 3
Starting task with argument: 2
Starting task with argument: 1
10:21:51.238 | INFO    | Task run 'run-d0d' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274aa0> finished with 1
10:21:52.237 | INFO    | Task run 'run-f40' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274a10> finished with 2
10:21:53.237 | INFO    | Task run 'run-9cb' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274980> finished with 3
Launching followup
Starting task with argument: 2
10:21:55.305 | INFO    | Task run 'run-d92' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17d7dd90> finished with 2
10:21:55.509 | INFO    | Flow run 'elastic-toucan' - Finished in state Completed()
The issue is with the
DaskTaskRunner
.
dask
needs to pickle the flow, and will thus pickle the
MainObj
instance, while it is not picklable and nor would it be reasonable to pickle, which I represented with the
large_object
.
dask
tries
pickle
and
cloudpickle
, but both fail with
TypeError: cannot pickle '_thread.lock' object
,
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
and
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
. I tried to move the flow out of the object, but I always have something unpickable in the flow which causes a similar error. Is there any way to handle this kind of problem? Any help would be a lifesaver! Thanks a lot in advance! Some things that I can't change: - The DaskTaskRunner is necessary, there isn't any other supported task runner which can replace it (interfacing with SLURM and limiting tasks based on custom resources). -
process_result
needs to have access to the
MainObj
instance, since it updates its fields based on the task result. - Tasks need to be immediately launched based on the result of completing tasks, hence the necessity for
AsCompleted
.
Hey guys, I'd really appreciate some help. I've deployed prefect server with what I believe is the ...
n

Nicholas Andonakis

over 1 year ago
Hey guys, I'd really appreciate some help. I've deployed prefect server with what I believe is the simplest possible method. I just have a single Windows Server, and currently a single flow. I start the server with: prefect server start --host 0.0.0.0 And I run my python script with: python main.py It contains: if name == "__main__": my_flow.serve(name="deploymentv1") The deployment is scheduled on the server at 4am every day. Firstly, is this the simplest and most robust method? I really have found the documentation on this stuff really really lacking. The docs and the AI's get clouded with obsolete Prefect V1 content. I suspect I could put the scheduling in the serve command, but the current setup should work. This has been running fine for 2 weeks, but today it just completely failed to schedule the flow run! This is a complete show stopper if I can't rely on it to schedule. Not sure how I can debug this. I tried looking through the server console output and noticed a lot of these messages: sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked [SQL: DELETE FROM flow_run_notification_queue WHERE flow_run_notification_queue.id IN (SELECT 1 FROM (SELECT 1) WHERE 1!=1)] I looked that up and its a known thing? I'm about to wrap the server start in the following batch file to see if it improves and so I can have useful logs. server.bat @echo off set PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED=false prefect server start --host 0.0.0.0 >> server.txt 2>&1 rem email me that the server stopped However I can't proceed with prefect if I can't rely on it to run my flow each day. A bad PLan B is I just use windows task scheduler to run the python flow scripts, but I would lose a lot of the reason for using prefect in the first place if I do that.