tl;dr How scalable is Prefect Server for scheduling concurrent runs of 10s of thousands of flows? I...
s

Sam Brownlow

over 3 years ago
tl;dr How scalable is Prefect Server for scheduling concurrent runs of 10s of thousands of flows? I'm evaluating Prefect Server as a solution for managing a data pipeline. How I envisioned this working was using the Prefect framework as a computationally minimal "glue." Prefect would be the central pipe through which the state of the data flows (but not the data itself). It would hand off all computation to microservices via (a)sync requests and its only concern would be controlling how the state is pushed from service to service. Prefect seems to be a great tool for this job; I really like the API and how quickly a developer can get up and running. However, there are a couple of things that I don't yet have total clarity on. This documentation describes Server as a "highly scalable scheduler" but also says that it may start degrading at "~10-20 tasks running concurrently against a typical Server." Does the above degradation occur because the
typical Server
is deployed to a single node via docker-compose with a single agent, instead of being deployed via something like Helm, which horizontally scales scheduled flows across a cluster of agents? How quickly does the size of the PostgreSQL database generally grow, relative to the number of flows run? Is there any reason that regularly deleting old runs would be any more complicated than as suggested here? Are there any case studies with Prefect Server being used to run 10s of thousands of concurrent flows? I see that there used to be a Nomad Agent, are there any helpful resources for running Prefect on a nomad cluster? Thanks for any advice you are able to share. I have been diving into Prefect for only the past couple of days so greatly appreciate any referential pointers here.
Hi Prefect Community! We are running our infrastructure on GKE Autopilot and have been seeing an in...
k

Keith

over 2 years ago
Hi Prefect Community! We are running our infrastructure on GKE Autopilot and have been seeing an increase in the number of
Crashed
jobs recently. I am trying to do root cause analysis on it so I start by digging through the logs in Prefect Cloud. What I see is that at some point (it is random) the logs stop and nothing further is output to the UI. Digging through the logs in Google Logs Explorer I see the same behavior, Prefect container logs stop at the same specific point in time. Inside Google Logs I am also able to see a lot of Kubernetes related logs and am starting to see a pattern but not clear how to fix it. • Roughly 5-10 seconds after the last log this shows up: ◦
INFO 2023-02-03T19:18:11Z [resource.labels.nodeName: gk3-prefect-autopilot-cl-nap-ji2s72nv-db29cac6-hxzc] marked the node as toBeDeleted/unschedulable
• Quickly followed by: ◦
INFO 2023-02-03T19:18:11Z [resource.labels.clusterName: prefect-autopilot-cluster-1] Scale-down: removing node gk3-prefect-autopilot-cl-nap-ji2s72nv-db29cac6-hxzc, utilization: {0.5538631957906397 0.1841863664058054 0 cpu 0.5538631957906397}, pods to reschedule: adorable-axolotl-d8k8c-6dx5c
INFO 2023-02-03T19:18:38Z [resource.labels.clusterName: prefect-autopilot-cluster-1] Scale-down: node gk3-prefect-autopilot-cl-nap-ji2s72nv-db29cac6-hxzc removed with drain
• GKE tries to reschedule the job but it fails with the following, which is when Prefect alerts for the
Crashed
state: ◦
INFO 2023-02-03T19:18:11Z [resource.labels.podName: adorable-axolotl-d8k8c-6dx5c] deleting pod for node scale down
ERROR 2023-02-03T19:18:19.215934101Z [resource.labels.containerName: prefect-job] 19:18:19.214 | INFO | prefect.engine - Engine execution of flow run '8ca83100-dcc3-46d5-91be-f342b19b45a9' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.
This appears to be happening on jobs randomly and leads me to believe that GKE believes the cluster is overprovisioned so it is trying to reduce the cluster size and move jobs around, but jobs can't be moved in the middle of execution and Crash/Fail. I am also curious if this is due to resource sizing, but I am not seeing any issues with the jobs I have been troubleshooting with
insufficient resource
problems. They all typically state the following in the
containerStatuses
leaf of the JSON element with the following message:
state: {
terminated: {
containerID: "<containerd://aac705>"
exitCode: 143
finishedAt: "2023-02-03T19:18:19Z"
reason: "Error"
startedAt: "2023-02-03T19:16:52Z"
}}
Any incite would be greatly appreciated!
Hi, I am evaluating prefect cloud 2.0 and ran into the following `PrefectHTTPStatusError` for one of...
k

Karanveer Mohan

about 3 years ago
Hi, I am evaluating prefect cloud 2.0 and ran into the following
PrefectHTTPStatusError
for one of my subflow runs. The cloud UI also is in an inconsistent state since it says flow/subflow is still running. It works fine locally. Any help would be appreciated!
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 559, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/root/deployer/pipeline.py", line 21, in run_ro
    etl.run_ro()
  File "/usr/local/lib/python3.9/site-packages/prefect/flows.py", line 367, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 154, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 454, in create_and_begin_subflow_run
    terminal_state = await orchestrate_flow_run(
  File "/usr/local/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.9/contextlib.py", line 199, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1181, in report_flow_run_crashes
    await client.set_flow_run_state(
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1505, in set_flow_run_state
    response = await <http://self._client.post|self._client.post>(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 277, in send
    response.raise_for_status()
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 223, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '503 Service Unavailable' for url '<https://api-beta.prefect.io/api/accounts/cf175afc-09c0-4bc3-bdba-2ad01653e8ea/workspaces/7eb80c34-5efe-4f01-9ee9-64715100b7e3/flow_runs/29563420-7c6f-4680-97bf-14d148d06ee7/set_state>'
For more information check: <https://httpstatuses.com/503>
02:01:18 PM
Crash detected! Execution was interrupted by an unexpected exception.
1
Hey folks, I'm looking into migrating a django + celery app to use Prefect for async task processing...
s

Samuel Schlesinger

7 months ago
Hey folks, I'm looking into migrating a django + celery app to use Prefect for async task processing – we run X celery workers, which all listen to a queue broker (redis/rabbitmq/etc), and our API adds jobs to the queue as users trigger events. Pretty standard stuff. I'm trying to recreate a basic version of this in Prefect. I've got a prefect server running; and several workers running in docker instances, each joining a work pool. The workers have the application code baked into the image. In celery-land, I'd just trigger jobs by calling
my_decorated_func.apply_async([args])
, i.e. `say_hello_world.apply_async(["Marvin"])`; and the workers would pick up the jobs, set up app internals (environment config et al), and then run the decorated function automatically. I'm not seeing an obvious way to do this with Prefect. I can call my
say_hello_world
flow directly, and it'll run locally, but I need it to run in the worker pool. Calling
.deploy()
tries to register it with the default worker pool, which is great, but it complains about needing an entrypoint or image. I saw some comments online about using 'local storage' to point to the specific file the flow is in, i.e.
/path/to/file/flow.py:say_hello_world
, but... there's no way that's the "right" way to queue a job, right? I get that the Prefect control plane allows for total independence between the place that's queueing jobs and the place that's executing them, but in my case, they're both the same docker image; just with different entrypoints (starting the API vs starting the prefect workers). What's a clean way to just say "look for this exact same decorated function in the worker", essentially as if it were running locally but in a different container? CC @Marvin