Hey team, can you help me getting around this issue I'm having in the past few weeks on a Prefect Ag...
e

Emil Ordoñez

over 2 years ago
Hey team, can you help me getting around this issue I'm having in the past few weeks on a Prefect Agent constantly stopping after this error:
Failed the last 3 attempts. Please check your environment and configuration.
Examples of recent errors:
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 224, in "
process_input
"    func, target_state = self._transitions[(self.state, input_)]"
"KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)"
"During handling of the above exception, another exception occurred:"
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"116, in handle_async_request"
"    await self._send_request_headers(request=request, stream_id=stream_id)"
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"213, in _send_request_headers"
"    self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)"
"  File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 766, in "
send_headers
    self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
"  File ""/usr/local/lib/python3.10/site-packages/h2/connection.py"", line 228, in "
process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in 
state ConnectionState.CLOSED
"During handling of the above exception, another exception occurred:"
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 60, in map_httpcore_exceptions"
    yield
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 353, in handle_async_request"
    resp = await self._pool.handle_async_request(req)
  File 
"""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py"", "
"line 253, in handle_async_request"
    raise exc
  File 
"""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py"", "
"line 237, in handle_async_request"
    response = await connection.handle_async_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py"", "
"line 90, in handle_async_request"
    return await self._connection.handle_async_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpcore/_async/http2.py"", line "
"152, in handle_async_request"
    raise LocalProtocolError(exc)  # pragma: nocover
httpcore.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in 
state ConnectionState.CLOSED
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/services.py"", "
"line 46, in critical_service_loop"
    await workload()
"  File ""/usr/local/lib/python3.10/site-packages/prefect/agent.py"", line 261, in "
check_for_cancelled_flow_runs
    async for work_queue in self.get_work_queues():
"  File ""/usr/local/lib/python3.10/site-packages/prefect/agent.py"", line 144, in "
get_work_queues
    work_queue = await self.client.read_work_queue_by_name(
  File 
"""/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py"", line "
"850, in read_work_queue_by_name"
"    response = await self._client.get(f""/work_queues/name/{name}"")"
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1754, in"
get
    return await self.request(
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1530, in"
request
"    return await self.send(request, auth=auth, "
follow_redirects=follow_redirects)
"  File ""/usr/local/lib/python3.10/site-packages/prefect/client/base.py"", line "
"251, in send"
    response = await self._send_with_retry(
"  File ""/usr/local/lib/python3.10/site-packages/prefect/client/base.py"", line "
"194, in _send_with_retry"
    response = await request()
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1617, in"
send
    response = await self._send_handling_auth(
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1645, in"
_send_handling_auth
    response = await self._send_handling_redirects(
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1682, in"
_send_handling_redirects
    response = await self._send_single_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_client.py"", line 1719, in"
_send_single_request
    response = await transport.handle_async_request(request)
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 352, in handle_async_request"
    with map_httpcore_exceptions():
"  File ""/usr/local/lib/python3.10/contextlib.py"", line 153, in __exit__"
"    self.gen.throw(typ, value, traceback)"
"  File ""/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py"", "
"line 77, in map_httpcore_exceptions"
    raise mapped_exc(message) from exc
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state 
ConnectionState.CLOSED
Traceback (most recent call last):
"  File ""/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py"", line 41, in wrapper"
"    return fn(*args, **kwargs)"
"  File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py"", line 260, in coroutine_wrapper"
    return call()
"  File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 245, in __call__"
    return self.result()
"  File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 173, in result"
    return self.future.result(timeout=timeout)
"  File ""/usr/local/lib/python3.10/concurrent/futures/_base.py"", line 451, in result"
    return self.__get_result()
"  File ""/usr/local/lib/python3.10/concurrent/futures/_base.py"", line 403, in __get_result"
    raise self._exception
"  File ""/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py"", line 218, in _run_async"
    result = await coro
"  File ""/usr/local/lib/python3.10/site-packages/prefect/cli/agent.py"", line 189, in start"
    async with anyio.create_task_group() as tg:
"  File ""/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py"", line 662, in __aexit__"
    raise exceptions[0]
"  File ""/usr/local/lib/python3.10/site-packages/prefect/utilities/services.py"", line 104, in critical_service_loop"
"    raise RuntimeError(""Service exceeded error threshold."")"
RuntimeError: Service exceeded error threshold.
An exception occurred.
I'm running the Agent as an ECS Service on Fargate, so it's always starting again but with unexpected random behavior, sometimes the flows keep frozen after this issue. I think this is happening after I updated to the 2.10.6 version of prefect. Before the update, the agent was not showing that Stopping behavior. Has anyone experienced something similar? I see in this thread some kind of similar error, but I'm not sure if doing the
prefect config set PREFECT_API_ENABLE_HTTP2=False
thing didn't actually solve that issue and I'm also not sure how to specify that on the ECS Task Definition? Should I add this config as an Environment Variable?
Hey guys, I'd really appreciate some help. I've deployed prefect server with what I believe is the ...
n

Nicholas Andonakis

over 1 year ago
Hey guys, I'd really appreciate some help. I've deployed prefect server with what I believe is the simplest possible method. I just have a single Windows Server, and currently a single flow. I start the server with: prefect server start --host 0.0.0.0 And I run my python script with: python main.py It contains: if name == "__main__": my_flow.serve(name="deploymentv1") The deployment is scheduled on the server at 4am every day. Firstly, is this the simplest and most robust method? I really have found the documentation on this stuff really really lacking. The docs and the AI's get clouded with obsolete Prefect V1 content. I suspect I could put the scheduling in the serve command, but the current setup should work. This has been running fine for 2 weeks, but today it just completely failed to schedule the flow run! This is a complete show stopper if I can't rely on it to schedule. Not sure how I can debug this. I tried looking through the server console output and noticed a lot of these messages: sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked [SQL: DELETE FROM flow_run_notification_queue WHERE flow_run_notification_queue.id IN (SELECT 1 FROM (SELECT 1) WHERE 1!=1)] I looked that up and its a known thing? I'm about to wrap the server start in the following batch file to see if it improves and so I can have useful logs. server.bat @echo off set PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED=false prefect server start --host 0.0.0.0 >> server.txt 2>&1 rem email me that the server stopped However I can't proceed with prefect if I can't rely on it to run my flow each day. A bad PLan B is I just use windows task scheduler to run the python flow scripts, but I would lose a lot of the reason for using prefect in the first place if I do that.
Hey folks, I'm looking into migrating a django + celery app to use Prefect for async task processing...
s

Samuel Schlesinger

9 months ago
Hey folks, I'm looking into migrating a django + celery app to use Prefect for async task processing – we run X celery workers, which all listen to a queue broker (redis/rabbitmq/etc), and our API adds jobs to the queue as users trigger events. Pretty standard stuff. I'm trying to recreate a basic version of this in Prefect. I've got a prefect server running; and several workers running in docker instances, each joining a work pool. The workers have the application code baked into the image. In celery-land, I'd just trigger jobs by calling
my_decorated_func.apply_async([args])
, i.e. `say_hello_world.apply_async(["Marvin"])`; and the workers would pick up the jobs, set up app internals (environment config et al), and then run the decorated function automatically. I'm not seeing an obvious way to do this with Prefect. I can call my
say_hello_world
flow directly, and it'll run locally, but I need it to run in the worker pool. Calling
.deploy()
tries to register it with the default worker pool, which is great, but it complains about needing an entrypoint or image. I saw some comments online about using 'local storage' to point to the specific file the flow is in, i.e.
/path/to/file/flow.py:say_hello_world
, but... there's no way that's the "right" way to queue a job, right? I get that the Prefect control plane allows for total independence between the place that's queueing jobs and the place that's executing them, but in my case, they're both the same docker image; just with different entrypoints (starting the API vs starting the prefect workers). What's a clean way to just say "look for this exact same decorated function in the worker", essentially as if it were running locally but in a different container? CC @Marvin