Jon Mason
08/22/2025, 4:48 PMMarvin
08/22/2025, 4:48 PMMarvin
08/22/2025, 4:53 PM@task
create task runs. Pure Python functions (or Dask’s own client.submit
) won’t show up.
- Subflows vs tasks: Calls to @flow
create subflow runs, not task runs. Tasks inside subflows appear under the subflow’s run page.
- Using Dask directly: If you submit work with Dask directly (e.g., dask.distributed.Client().submit(...)
), Prefect won’t track them as task runs.
- Dask workers can’t reach the Prefect API: If you’re connecting to an external Dask cluster from your ECS job, those workers need network access to the Prefect API and the same Prefect version installed.
- Task runner misconfiguration: If DaskTaskRunner isn’t actually being used by the flow (e.g., not set on the @flow
decorator), tasks will run locally and might be confused as “missing”.
Minimal example you can compare against
This should produce two task runs in the UI. Try running this in your ECS job container; if this works, we know the basics are fine and the issue is in your flow’s code/config.
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def inc(x):
return x + 1
@flow(name="dask-demo", task_runner=DaskTaskRunner())
def dask_demo():
a = inc.submit(1)
b = inc.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
dask_demo()
If this shows task runs but your flow does not:
- Ensure all functions you expect to see are decorated with @task
and invoked via .submit(...)
(for parallelism) or direct call (for sequential).
- If you’re calling subflows (@flow
), look at the subflow run page for its task runs.
If this also shows no task runs:
- Temporarily swap to a synchronous runner to isolate Dask:
from prefect.task_runners import SequentialTaskRunner
@flow(task_runner=SequentialTaskRunner())
def dask_demo():
...
If task runs appear with SequentialTaskRunner, the problem is with the Dask cluster config.
- Confirm Dask workers can reach the Prefect API (network, DNS, outbound egress). If you’re pointing DaskTaskRunner at an external scheduler, those worker containers must have:
- PREFECT_API_URL
and PREFECT_API_KEY
available
- Prefect (same major/minor) and prefect-dask installed
- Double-check you’re using the Prefect 3 Dask runner import:
from prefect_dask.task_runners import DaskTaskRunner
A few clarifying questions to help me pinpoint it:
1) Could you share a minimal snippet of your flow and one @task
function (how you call it)?
2) Are you connecting DaskTaskRunner to an external Dask cluster (scheduler address), or using the default local cluster inside the ECS job container?
3) What Prefect version is installed in your ECS job container (and Dask workers if external)?
4) Do you see any task-related log lines in the flow logs (e.g., “submitted task run …”)?
Docs for reference:
- Dask integration: Prefect Dask integration
- Task runners: Task runners in Prefect 3
Share a snippet and details and I’ll get you to a concrete fix quickly.Jon Mason
08/22/2025, 4:58 PMfrom prefect_dask import DaskTaskRunner
instead of from prefect_dask.task_runners import DaskTaskRunner
. I'll try that. In the interim, answers to your questions -
1. I'm running a simple example that looks more or less like the one you suggested.
2. I'm not connecting to an external dask cluster (the flow has task_runner=DaskTaskRunner()
Jon Mason
08/22/2025, 4:59 PMFinished in state Completed()
Jon Mason
08/22/2025, 5:01 PMfrom prefect_dask import DaskTaskRunner
is identical to from prefect_dask.task_runners import DaskTaskRunner
, anything else I can try?Jon Mason
08/22/2025, 5:05 PMSequentialTaskRunner
@MarvinMarvin
08/22/2025, 5:05 PMMarvin
08/22/2025, 5:06 PMSequentialTaskRunner
. The local runners are:
- PrefectTaskRunner
(default)
- ThreadPoolTaskRunner
- ProcessPoolTaskRunner
To isolate whether Dask is the issue, switch your flow to the default runner (or explicitly set PrefectTaskRunner
) and confirm you see task runs:
from prefect import flow, task
from prefect.task_runners import PrefectTaskRunner
# from prefect_dask.task_runners import DaskTaskRunner # your current config
@task
def inc(x):
return x + 1
@flow(name="runner-check", task_runner=PrefectTaskRunner()) # or omit task_runner entirely
def runner_check():
a = inc.submit(1)
b = inc.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
runner_check()
- If task runs appear with PrefectTaskRunner
but not with DaskTaskRunner
, it points to the Dask cluster/workers not having Prefect configured (e.g., `PREFECT_API_URL`/`PREFECT_API_KEY`), or using raw Dask submissions instead of Prefect `@task`s.
- If they don’t appear even with PrefectTaskRunner
, then the issue is likely that the functions aren’t decorated with @task
(or something is off with the runtime environment).
Docs:
- Task runners: Task runners in Prefect 3
- Dask integration: Prefect Dask integration
If you can share a minimal snippet of your flow (especially how you define and call tasks) and confirm whether you’re using a local Dask cluster vs an external scheduler, I can pinpoint the fix quickly.Jon Mason
08/22/2025, 5:41 PM@task
def inc(x):
return x + 1
@flow(name='dask-test', task_runner=DaskTaskRunner())
def example_flow():
a = inc.submit(1)
b = inc.submit(2)
return a.result() + b.result()
And without dask:
@task
def inc(x):
return x + 1
@flow(name='no-dask-test', task_runner=PrefectTaskRunner())
def no_dask_example_flow():
a = inc.submit(1)
b = inc.submit(2)
return a.result() + b.result()
The dask-test
flow completes successfully but no tasks are displayed. This is the behavior I was seeing before.
The no-dask-test
flow is more interesting - it shows two tasks scheduled, but neither completes! The flow is stuck as well.
Ideas on what could be going on here?Marvin
08/22/2025, 5:41 PMMarvin
08/22/2025, 5:43 PMfrom prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone
@task
def debug_task(x):
logger = get_run_logger()
<http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
<http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
<http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
try:
ctx = get_run_context()
<http://logger.info|logger.info>(f"Task run id: {ctx.task_run.id} | Flow run id: {ctx.flow_run.id}")
# Attempt a simple API call to prove connectivity from the task
from prefect.client.orchestration import get_client
async def ping():
async with get_client() as client:
await client.read_flow_run(ctx.flow_run.id)
import anyio
anyio.run(ping)
<http://logger.info|logger.info>("Prefect API reachable from task")
except Exception as e:
logger.error(f"No task run context or API error: {e}")
return x + 1
Then:
- Dask
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner
@flow(name="dask-test", task_runner=DaskTaskRunner())
def dask_flow():
a = debug_task.submit(1)
b = debug_task.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
dask_flow()
- PrefectTaskRunner
from prefect import flow
from prefect.task_runners import PrefectTaskRunner
@flow(name="no-dask-test", task_runner=PrefectTaskRunner())
def local_flow():
a = debug_task.submit(1)
b = debug_task.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
local_flow()
What to look for:
- If the Dask run logs “No task run context or API error”: your Dask workers are running the code but don’t have Prefect context/API access. That’s why you see no task runs for Dask.
- If the PrefectTaskRunner run shows the context and API reachable, but the UI still shows “Scheduled” forever, check the UTC timestamp line versus the time shown in the UI — large skew will keep tasks Scheduled.
2) Common causes and fixes
- Dask workers can’t reach Prefect API
- Ensure the worker processes/containers have PREFECT_API_URL and PREFECT_API_KEY set and can egress to your Prefect API (Cloud or OSS). In ECS, add these env vars to the task definition so child processes inherit them.
- If you’re using an external Dask cluster (address=...), that cluster’s worker image must have the same Prefect and prefect-dask versions and the API env vars.
- Turn on logs to see Dask worker details:
- Set env vars in the flow container: PREFECT_LOGGING_LEVEL=DEBUG and PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask
- Clock skew causing “Scheduled” to never run
- If the ECS container clock differs from your Prefect server/Cloud by a lot, task scheduled_time can be in the future. Make sure NTP/clock sync is good on the ECS host/Fargate environment. Compare the “UTC now” in the debug task logs to the time in the UI.
- Package/version mismatch
- Confirm versions inside the ECS flow container (and Dask workers if external):
- Prefect >= 3.x, prefect-dask matching the major/minor of Prefect
- Dask/distributed installed
- Incompatible versions can cause the task engine to run the function body but fail to register state.
3) Quick checks to run now
- In the ECS task definition for your flow job image:Marvin
08/22/2025, 5:43 PMpython -c "import prefect, prefect_dask; print(prefect.__version__, prefect_dask.__version__)"
)
With those, I can point to the exact fix. Docs for reference:
- Task runners: Task runners in Prefect 3
- Dask integration: Prefect Dask integrationJon Mason
08/22/2025, 6:03 PMMarvin
08/22/2025, 6:03 PMMarvin
08/22/2025, 6:03 PMfrom prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone
@task
def debug_task(x):
logger = get_run_logger()
<http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
<http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
<http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
try:
ctx = get_run_context()
<http://logger.info|logger.info>(f"Task run id: {ctx.task_run.id} | Flow run id: {ctx.flow_run.id}")
# Probe Prefect API from within the task
from prefect.client.orchestration import get_client
async def ping():
async with get_client() as client:
await client.read_flow_run(ctx.flow_run.id)
import anyio
anyio.run(ping)
<http://logger.info|logger.info>("Prefect API reachable from task")
except Exception as e:
logger.error(f"No task run context or API error: {e}")
return x + 1
Dask runner flow:
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner
@flow(name="dask-test", task_runner=DaskTaskRunner())
def dask_flow():
a = debug_task.submit(1)
b = debug_task.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
dask_flow()
Local runner flow:
from prefect import flow
from prefect.task_runners import PrefectTaskRunner
@flow(name="no-dask-test", task_runner=PrefectTaskRunner())
def local_flow():
a = debug_task.submit(1)
b = debug_task.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
local_flow()
Run each and check the logs:
- If Dask logs “No task run context or API error”, the Dask workers aren’t reaching the Prefect API or don’t have Prefect context/env.
- If Local shows tasks stuck Scheduled, compare the UTC timestamp to the UI time (clock skew), and confirm API access from inside the container.Jon Mason
08/22/2025, 6:05 PMdebug_task
raises an attribute error because the TaskRunContext
has no attribute flow_run
- I don't think this indicates that we can't connect, but a mistake in debug task.
We get no logs from the no-dask-test
because the tasks never run.
However, the system time in the dask tasks seems to be similar to the ui time (I'm looking at the timestamp in the flow run, not sure what you mean beyond that.)
I am running self-hosted in an ECS cluster. The PREFECT3_API_URL environment variable is set correctly in the tasks. I am using a local dask cluster (not passing an external url). I'm using prefect version=3.3.7, prefect-dask version=0.3.5
. The prefect3 server is running in an elastic beanstalk app.Marvin
08/22/2025, 6:12 PMMarvin
08/22/2025, 6:15 PMflow_run
attribute. You can get the flow run id from ctx.task_run.flow_run_id
. Here’s a corrected probe that works in both flow and task contexts:
from prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone
@task
def debug_task(x):
logger = get_run_logger()
<http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
<http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
<http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
try:
ctx = get_run_context()
# Try to extract ids from either context shape
tr_id = getattr(getattr(ctx, "task_run", None), "id", None)
fr_id = getattr(getattr(ctx, "task_run", None), "flow_run_id", None) or getattr(getattr(ctx, "flow_run", None), "id", None)
<http://logger.info|logger.info>(f"Context ids: task_run_id={tr_id} flow_run_id={fr_id}")
# Probe Prefect API from within the task
from prefect.client.orchestration import get_client
import anyio
async def ping():
async with get_client() as client:
if fr_id:
await client.read_flow_run(fr_id)
anyio.run(ping)
<http://logger.info|logger.info>("Prefect API reachable from task")
except Exception as e:
logger.error(f"Context/API probe failed: {e}")
return x + 1
Now, to your two symptoms:
1) Dask flow shows no tasks, but completes
- Most likely cause: the Dask worker processes don’t have Prefect API configuration, so they execute the code but cannot report task state. In that case, you’ll see no task runs in the UI even though the flow returns.
- Critical detail I noticed: you mentioned setting PREFECT3_API_URL. Prefect 3 still uses the setting name PREFECT_API_URL (no “3”). If the workers only have PREFECT3_API_URL, Prefect won’t see it.
- Action:
- Ensure the environment contains PREFECT_API_URL (and PREFECT_API_KEY if your API requires auth), not PREFECT3_API_URL.
- Since you’re using a local Dask cluster within the ECS task, the worker processes should inherit the container env — verify inside the task logs that PREFECT_API_URL
is populated (the probe above logs it).
- Verify both Prefect and prefect-dask are installed in the flow container (and thus in worker processes): versions you shared (3.3.7 / 0.3.5) are fine.
2) PrefectTaskRunner shows two tasks Scheduled but they never start (and no task logs)
- This typically means the engine created the task runs in the API but the execution loop is not moving them to Running. Common causes:
- API URL misconfigured (e.g., missing “/api” path on self-hosted) so follow-up API calls 404/timeout
- Network or TLS issues between the container and your Elastic Beanstalk API (look for 401/403/404/5xx in server logs)
- Significant clock skew (less common if your timestamps look aligned)
- Actions to pinpoint:
- Make sure your PREFECT_API_URL includes the /api path for OSS (e.g., https://your-eb-domain/api) — this is a frequent gotcha.
- Run the probe above with PrefectTaskRunner and confirm:
- PREFECT_API_URL
is what you expect
- The API “ping” succeeds from within the task
- Try an inline task call (bypasses the submit/scheduler path):
@task
def inc(x): return x + 1
@flow(task_runner=PrefectTaskRunner())
def inline_test():
return inc(1) + inc(2)
if __name__ == "__main__":
inline_test()
- If inline works and creates/finishes task runs, but .submit()
stays Scheduled, the issue is with the runner’s execution loop communicating with your API.
- Turn up logging in the container:
- Set PREFECT_LOGGING_LEVEL=DEBUG
- Optionally PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask
Marvin
08/22/2025, 6:15 PMPREFECT_API_URL
line and whether “Prefect API reachable from task” appears
- The same from the PrefectTaskRunner flow
- Your exact PREFECT_API_URL
value (redact host if needed, but include whether it ends with /api)
Relevant docs:
- Runtime context: Runtime context
- Dask integration: Prefect Dask integration
- Settings reference: Prefect 3 settings
I strongly suspect renaming PREFECT3_API_URL to PREFECT_API_URL (and ensuring it includes /api) will resolve both issues; the probe will confirm quickly.Jon Mason
08/22/2025, 6:17 PMMarvin
08/22/2025, 6:17 PMJon Mason
08/22/2025, 6:18 PMMarvin
08/22/2025, 6:18 PMMarvin
08/22/2025, 6:19 PMfrom prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone
@task
def debug_task(x):
logger = get_run_logger()
<http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
<http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
<http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
try:
ctx = get_run_context()
tr_id = getattr(getattr(ctx, "task_run", None), "id", None)
fr_id = getattr(getattr(ctx, "task_run", None), "flow_run_id", None) or getattr(getattr(ctx, "flow_run", None), "id", None)
<http://logger.info|logger.info>(f"Context ids: task_run_id={tr_id} flow_run_id={fr_id}")
from prefect.client.orchestration import get_client
import anyio
async def ping():
async with get_client() as client:
if fr_id:
await client.read_flow_run(fr_id)
anyio.run(ping)
<http://logger.info|logger.info>("Prefect API reachable from task")
except Exception as e:
logger.error(f"Context/API probe failed: {e}")
return x + 1
Then run both flows again:
- Dask runner (local cluster in ECS)
- PrefectTaskRunner
Expected outcome after fixing PREFECT_API_URL:
- Dask: you should now see task runs in the UI
- PrefectTaskRunner: tasks should move from Scheduled to Running/Completed, and you’ll see “Prefect API reachable from task” in logs
If anything still sticks:
- Please share the exact value you’re using for PREFECT_API_URL (confirm it ends with /api), and any error from the probe logs
- Verify your ECS security group/NACL allows outbound to your EB domain/port
- Check your EB server logs for 404s or 401s on /api paths
References:
- Settings reference: Prefect 3 settings
- Runtime context: Runtime context
- Dask integration: Prefect Dask integrationMarvin
08/22/2025, 6:20 PMPREFECT_API_URL
format.
Two concrete next steps to pinpoint what’s blocking execution/state updates:
1) Use the corrected debug task and confirm API access from inside the Dask workers
Please re-run the Dask flow with this updated probe (note the flow_run_id
access is via ctx.task_run.flow_run_id
):
from prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone
@task
def debug_task(x):
logger = get_run_logger()
<http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
<http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
<http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
try:
ctx = get_run_context()
tr_id = getattr(getattr(ctx, "task_run", None), "id", None)
fr_id = getattr(getattr(ctx, "task_run", None), "flow_run_id", None) or getattr(getattr(ctx, "flow_run", None), "id", None)
<http://logger.info|logger.info>(f"Context ids: task_run_id={tr_id} flow_run_id={fr_id}")
from prefect.client.orchestration import get_client
import anyio
async def ping():
async with get_client() as client:
if fr_id:
await client.read_flow_run(fr_id)
anyio.run(ping)
<http://logger.info|logger.info>("Prefect API reachable from task")
except Exception as e:
logger.error(f"Context/API probe failed: {e}")
return x + 1
Run:
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner
@flow(name="dask-test", task_runner=DaskTaskRunner())
def dask_flow():
a = debug_task.submit(1)
b = debug_task.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
dask_flow()
- If you see “Context ids…” and “Prefect API reachable from task” in the logs, Dask workers do have Prefect context and can reach your API. If you still don’t see task runs in the UI, it points to state update calls failing (see #2).
- If you don’t see those lines, the Dask workers aren’t talking to the API; we’ll need to ensure the env and networking are available inside worker processes.
2) Check for PATCH being blocked at your Elastic Beanstalk/ALB layer
Your “Scheduled but never runs” symptom with PrefectTaskRunner
strongly suggests the API allows create/read (POST/GET) but blocks state transitions (PATCH). Prefect uses PATCH for moving runs to Running/Completed. If PATCH is blocked or altered by the proxy, tasks remain Scheduled in the UI and never show logs.
Actions:
- Check EB/ALB config and any WAF/reverse proxy in front of your /api
to ensure HTTP methods GET, POST, PATCH, DELETE are allowed and forwarded.
- Look at your Prefect Server logs when a flow starts; if PATCH is blocked you’ll typically see 405/403/404/5xx errors on endpoints like /api/task_runs/{id}/state
or /api/flow_runs/{id}/state
.
- Turn up client logs in the ECS container:
- PREFECT_LOGGING_LEVEL=DEBUG
- Optionally PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask
- Quick flow-level API probe (runs even if tasks won’t start):
from prefect import flow, get_run_context
from prefect.logging import get_run_logger
from prefect.client.orchestration import get_client
import anyio
@flow(name="api-ping")
def api_ping():
logger = get_run_logger()
ctx = get_run_context()
<http://logger.info|logger.info>(f"Flow run id: {ctx.flow_run.id}")
async def ping():
async with get_client() as client:
await client.read_flow_run(ctx.flow_run.id)
anyio.run(ping)
<http://logger.info|logger.info>("Flow can reach API")
if __name__ == "__main__":
api_ping()
If this succeeds but tasks still stick in Scheduled, it’s almost certainly method filtering on PATCH.
A few more checks
- Versions: you’re on Prefect 3.3.7 and prefect-dask 0.3.5; consider upgrading server and containers to the latest 3.4.x to pick up recent runner fixes.Marvin
08/22/2025, 6:20 PMJon Mason
08/22/2025, 8:19 PMdebug_task
, and I see many successful PATCH requests in the nginx access logs on the EBS server. What now?Marvin
08/22/2025, 8:19 PMMarvin
08/22/2025, 8:22 PMcurl -s <https://my-domain.com/api/version>
Please paste the JSON. If it reports a 2.x server, upgrade to Prefect 3 OSS server and re-test. If it’s 3.x but older than your SDK, consider upgrading both server and SDK to the latest 3.4.x.
2) Verify you’re looking at the same backend the flows write to
Since debug_task
logs show a concrete task_run_id
, let’s confirm that record exists in your DB. Grab one task_run_id
from the Dask flow logs and query your Prefect DB directly:
- If it’s there, the UI should be able to display it — this points to a UI/view/filtering or workspace mismatch.
- If it’s not there, your flow may be writing to a different API/database than the UI you’re checking.
3) Try the same flows outside ECS against the same API
Run the two small flows (PrefectTaskRunner and DaskTaskRunner) from your laptop but pointing at your EB API:
- Set PREFECT_API_URL=<https://my-domain.com/api>
locally
- Run the flows
If tasks appear locally but not from ECS, it’s an infra/network/pool issue in ECS. If they fail locally too, it’s server-side config/version.
4) Turn on Prefect DEBUG logs for the ECS run and look for state update errors
Set these env vars on the ECS task and re-run:
- PREFECT_LOGGING_LEVEL=DEBUG
- PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask
Then share any lines that mention set_task_run_state
, set_flow_run_state
, or httpx
errors. Even with nginx 200s, the client may be retrying/timeout-ing at times (connection pool, keep-alive, proxy limits).
5) Try a different runner to triangulate (ThreadPool)
This will tell us if the issue is specific to Dask or more general in your ECS environment.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task(log_prints=True)
def inc(x):
print(f"inc({x})")
return x + 1
@flow(name="threadpool-test", task_runner=ThreadPoolTaskRunner())
def threadpool_flow():
a = inc.submit(1)
b = inc.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
threadpool_flow()
- If ThreadPool shows proper task runs and completes, the Dask integration/config is suspect.
- If ThreadPool also sticks in Scheduled, the issue is not Dask.
6) Upgrade to latest 3.4.x
You’re on Prefect 3.3.7 and prefect-dask 0.3.5. There have been runner and engine fixes since then. If feasible, upgrade both the server and ECS container images to 3.4.x and the latest prefect-dask, then re-test the two sample flows.
What would help me help you fastest
- The JSON output of GET https://my-domain.com/api/version
- Whether the task_run_id
from debug_task
appears in your DB
- The DEBUG logs from the PrefectTaskRunner flow showing where it gets stuck
- Results of the ThreadPool test flow
Useful docs
- Task runners: Task runners in Prefect 3
- Dask integration: Prefect Dask integration
- Settings: Prefect 3 settings
Given what you’ve confirmed so far, my top suspicion is a version mismatch between SDK and server or a subtle infra difference between where your flows run and where your UI/DB is pointing. The checks above will pinpoint which.Jon Mason
08/22/2025, 8:32 PMprefect
command with the PREFECT_API_URL
pointed at my server, won't the flows run in ECS?Marvin
08/22/2025, 8:33 PMMarvin
08/22/2025, 8:34 PMprefect deployment run
. Just execute your Python script directly with PREFECT_API_URL
set.
Key points
- The prefect
CLI does not have a generic “run this flow locally” command in 3.x. If you use prefect deploy
and prefect deployment run
, your flow run will wait for a worker and, in your setup, execute in ECS.
- To run locally, invoke the flow function directly in a Python process. Prefect will still create/update runs in your remote server if PREFECT_API_URL
(and PREFECT_API_KEY
if required) are set in your environment.
How to run locally against your server
1) Save a small script, e.g. local_test.py:
from prefect import flow, task
from prefect.task_runners import PrefectTaskRunner
from prefect_dask.task_runners import DaskTaskRunner
@task
def inc(x):
return x + 1
@flow(name="local-no-dask", task_runner=PrefectTaskRunner())
def local_no_dask():
a = inc.submit(1)
b = inc.submit(2)
return a.result() + b.result()
@flow(name="local-dask", task_runner=DaskTaskRunner())
def local_dask():
a = inc.submit(1)
b = inc.submit(2)
return a.result() + b.result()
if __name__ == "__main__":
# choose one to test
local_no_dask()
# local_dask()
2) Run it locally with your server configured:
- If OSS server with no auth:
PREFECT_API_URL=<https://my-domain.com/api> python local_test.py
- If auth enabled:
PREFECT_API_URL=<https://my-domain.com/api> PREFECT_API_KEY=**** python local_test.py
That will:
- Execute the flow on your local machine (not ECS)
- Report the flow/task runs to your self-hosted server so you can see them in the UI
Notes
- Do not use prefect deployment run
for this test; that will route to your ECS worker.
- You can also test Dask locally by switching to `local_dask()`; this spins up a local Dask cluster inside your process.
- If you want to ensure the right extras are installed, you can run:
uv run --with 'prefect[dask]' python local_test.py
Docs for reference:
- Task runners: Task runners in Prefect 3
- Dask integration: Prefect Dask integrationJon Mason
08/22/2025, 8:39 PMprefect --version
returns 3.3.7.
2. There is no row in the database task_run
table with the id from the logs. The only rows in that table are in the scheduled or cancelled state, suggesting that they are from the no_dask_example
flow where the tasks stalled.
3. Running locally as described produced no task runs, but did give an interesting warning:
15:37:09.992 | WARNING | prefect.events.clients - Unable to connect to '<wss://my-domain.com/api/events/in>'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: server rejected WebSocket connection: HTTP 404. Set PREFECT_DEBUG_MODE=1 to see the full error.
Marvin
08/22/2025, 8:39 PMMarvin
08/22/2025, 8:40 PM# Proxy WebSocket for Prefect events ingestion
location /api/events/in {
proxy_pass <http://127.0.0.1:4200/api/events/in;> # or your internal upstream
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
proxy_read_timeout 600s;
proxy_send_timeout 600s;
}
- If you have an ALB in front, ALB supports WebSockets — just ensure the listener/target group forwards to your nginx, and nothing strips Upgrade/Connection headers.
- If there’s CloudFront in front, be sure the behavior allows WebSockets and doesn’t block the /api/events/in path.
How to verify the fix
- From your laptop: use a simple ws client (e.g. wscat) to check the handshake returns 101 Switching Protocols, not 404:
- wscat -c <wss://my-domain.com/api/events/in>
(it may immediately close after connect — that’s fine; we just want to see the 101).
- Re-run your small local flow with PREFECT_API_URL=https://my-domain.com/api and confirm that task runs appear.
- Re-run your ECS flows; Dask task runs should now be recorded as well.
Also confirm server services are running
- Prefect server should start with services enabled (default). Ensure you are not using prefect server start --no-services
.
- On server start, logs should mention starting services like “event persister” and “task run recorder”.
Why the PrefectTaskRunner tasks were stuck Scheduled
- When events aren’t reaching the server, the engine/DB can get out of sync on run lifecycle. Restoring the events WebSocket typically resolves Scheduled runs that never progress.
- If anything still sticks after fixing WebSockets, enable DEBUG logs and check for state transition errors:
- PREFECT_LOGGING_LEVEL=DEBUG
- Look for calls to set_task_run_state / set_flow_run_state in the flow logs and corresponding responses in server logs.
Once the WebSocket route is fixed, I expect:
- The local test to show task runs
- The Dask/ECS test to show task runs
- The PrefectTaskRunner case to move past Scheduled
Helpful references:
- Settings reference: Prefect 3 settings
- Task runners: Task runners in Prefect 3
- Dask integration: Prefect Dask integration
If you can share your current EB/nginx config for the /api path (or confirm the change), I can sanity-check it.