<@ULVA73B9P> I'm not seeing any task runs in the u...
# marvin-ai
j
@Marvin I'm not seeing any task runs in the ui (or the database) when running my flow. I'm using ecs as a flow runner and the DaskTaskRunner.
m
thought for 301.9 seconds
Got it — a few common things can cause “no task runs” in the UI when using DaskTaskRunner (including on ECS). Here’s a quick checklist and a way to isolate the issue: Likely causes - Tasks aren’t decorated: Only functions decorated with
@task
create task runs. Pure Python functions (or Dask’s own
client.submit
) won’t show up. - Subflows vs tasks: Calls to
@flow
create subflow runs, not task runs. Tasks inside subflows appear under the subflow’s run page. - Using Dask directly: If you submit work with Dask directly (e.g.,
dask.distributed.Client().submit(...)
), Prefect won’t track them as task runs. - Dask workers can’t reach the Prefect API: If you’re connecting to an external Dask cluster from your ECS job, those workers need network access to the Prefect API and the same Prefect version installed. - Task runner misconfiguration: If DaskTaskRunner isn’t actually being used by the flow (e.g., not set on the
@flow
decorator), tasks will run locally and might be confused as “missing”. Minimal example you can compare against This should produce two task runs in the UI. Try running this in your ECS job container; if this works, we know the basics are fine and the issue is in your flow’s code/config.
Copy code
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def inc(x):
    return x + 1

@flow(name="dask-demo", task_runner=DaskTaskRunner())
def dask_demo():
    a = inc.submit(1)
    b = inc.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    dask_demo()
If this shows task runs but your flow does not: - Ensure all functions you expect to see are decorated with
@task
and invoked via
.submit(...)
(for parallelism) or direct call (for sequential). - If you’re calling subflows (
@flow
), look at the subflow run page for its task runs. If this also shows no task runs: - Temporarily swap to a synchronous runner to isolate Dask:
Copy code
from prefect.task_runners import SequentialTaskRunner

@flow(task_runner=SequentialTaskRunner())
def dask_demo():
    ...
If task runs appear with SequentialTaskRunner, the problem is with the Dask cluster config. - Confirm Dask workers can reach the Prefect API (network, DNS, outbound egress). If you’re pointing DaskTaskRunner at an external scheduler, those worker containers must have: -
PREFECT_API_URL
and
PREFECT_API_KEY
available - Prefect (same major/minor) and prefect-dask installed - Double-check you’re using the Prefect 3 Dask runner import:
Copy code
from prefect_dask.task_runners import DaskTaskRunner
A few clarifying questions to help me pinpoint it: 1) Could you share a minimal snippet of your flow and one
@task
function (how you call it)? 2) Are you connecting DaskTaskRunner to an external Dask cluster (scheduler address), or using the default local cluster inside the ECS job container? 3) What Prefect version is installed in your ECS job container (and Dask workers if external)? 4) Do you see any task-related log lines in the flow logs (e.g., “submitted task run …”)? Docs for reference: - Dask integration: Prefect Dask integration - Task runners: Task runners in Prefect 3 Share a snippet and details and I’ll get you to a concrete fix quickly.
j
I did have
from prefect_dask import DaskTaskRunner
instead of
from prefect_dask.task_runners import DaskTaskRunner
. I'll try that. In the interim, answers to your questions - 1. I'm running a simple example that looks more or less like the one you suggested. 2. I'm not connecting to an external dask cluster (the flow has
task_runner=DaskTaskRunner()
3. I have version 3.3.7 4. I see one task related log:
Finished in state Completed()
It seems like
from prefect_dask import DaskTaskRunner
is identical to
from prefect_dask.task_runners import DaskTaskRunner
, anything else I can try?
There's no such thing as a
SequentialTaskRunner
@Marvin
m
thought for 38.9 seconds
You’re right — my mistake. In Prefect 3.x there’s no
SequentialTaskRunner
. The local runners are: -
PrefectTaskRunner
(default) -
ThreadPoolTaskRunner
-
ProcessPoolTaskRunner
To isolate whether Dask is the issue, switch your flow to the default runner (or explicitly set
PrefectTaskRunner
) and confirm you see task runs:
Copy code
from prefect import flow, task
from prefect.task_runners import PrefectTaskRunner
# from prefect_dask.task_runners import DaskTaskRunner  # your current config

@task
def inc(x):
    return x + 1

@flow(name="runner-check", task_runner=PrefectTaskRunner())  # or omit task_runner entirely
def runner_check():
    a = inc.submit(1)
    b = inc.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    runner_check()
- If task runs appear with
PrefectTaskRunner
but not with
DaskTaskRunner
, it points to the Dask cluster/workers not having Prefect configured (e.g., `PREFECT_API_URL`/`PREFECT_API_KEY`), or using raw Dask submissions instead of Prefect `@task`s. - If they don’t appear even with
PrefectTaskRunner
, then the issue is likely that the functions aren’t decorated with
@task
(or something is off with the runtime environment). Docs: - Task runners: Task runners in Prefect 3 - Dask integration: Prefect Dask integration If you can share a minimal snippet of your flow (especially how you define and call tasks) and confirm whether you’re using a local Dask cluster vs an external scheduler, I can pinpoint the fix quickly.
j
@Marvin so I've run the two example flows: With dask:
Copy code
@task
def inc(x):
    return x + 1

@flow(name='dask-test', task_runner=DaskTaskRunner())
def example_flow():
    a = inc.submit(1)
    b = inc.submit(2)
    return a.result() + b.result()
And without dask:
Copy code
@task
def inc(x):
    return x + 1


@flow(name='no-dask-test', task_runner=PrefectTaskRunner())
def no_dask_example_flow():
    a = inc.submit(1)
    b = inc.submit(2)
    return a.result() + b.result()
The
dask-test
flow completes successfully but no tasks are displayed. This is the behavior I was seeing before. The
no-dask-test
flow is more interesting - it shows two tasks scheduled, but neither completes! The flow is stuck as well. Ideas on what could be going on here?
m
thought for 123.2 seconds
Thanks for the detailed repro — the symptoms point to two likely issues: - For Dask: tasks are executing but the workers aren’t reporting state to the Prefect API (so no task runs appear) - For PrefectTaskRunner: tasks are created but stuck in Scheduled (often due to clock skew or engine/API comms inside the container) Here’s a focused debug plan that will quickly tell us which it is. 1) Add a “task context” probe Run each flow with this task to see if the task actually has a Prefect task-run context and can reach your API from where it executes.
Copy code
from prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone

@task
def debug_task(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
    <http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
    <http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
    try:
        ctx = get_run_context()
        <http://logger.info|logger.info>(f"Task run id: {ctx.task_run.id} | Flow run id: {ctx.flow_run.id}")
        # Attempt a simple API call to prove connectivity from the task
        from prefect.client.orchestration import get_client
        async def ping():
            async with get_client() as client:
                await client.read_flow_run(ctx.flow_run.id)
        import anyio
        anyio.run(ping)
        <http://logger.info|logger.info>("Prefect API reachable from task")
    except Exception as e:
        logger.error(f"No task run context or API error: {e}")
    return x + 1
Then: - Dask
Copy code
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(name="dask-test", task_runner=DaskTaskRunner())
def dask_flow():
    a = debug_task.submit(1)
    b = debug_task.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    dask_flow()
- PrefectTaskRunner
Copy code
from prefect import flow
from prefect.task_runners import PrefectTaskRunner

@flow(name="no-dask-test", task_runner=PrefectTaskRunner())
def local_flow():
    a = debug_task.submit(1)
    b = debug_task.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    local_flow()
What to look for: - If the Dask run logs “No task run context or API error”: your Dask workers are running the code but don’t have Prefect context/API access. That’s why you see no task runs for Dask. - If the PrefectTaskRunner run shows the context and API reachable, but the UI still shows “Scheduled” forever, check the UTC timestamp line versus the time shown in the UI — large skew will keep tasks Scheduled. 2) Common causes and fixes - Dask workers can’t reach Prefect API - Ensure the worker processes/containers have PREFECT_API_URL and PREFECT_API_KEY set and can egress to your Prefect API (Cloud or OSS). In ECS, add these env vars to the task definition so child processes inherit them. - If you’re using an external Dask cluster (address=...), that cluster’s worker image must have the same Prefect and prefect-dask versions and the API env vars. - Turn on logs to see Dask worker details: - Set env vars in the flow container: PREFECT_LOGGING_LEVEL=DEBUG and PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask - Clock skew causing “Scheduled” to never run - If the ECS container clock differs from your Prefect server/Cloud by a lot, task scheduled_time can be in the future. Make sure NTP/clock sync is good on the ECS host/Fargate environment. Compare the “UTC now” in the debug task logs to the time in the UI. - Package/version mismatch - Confirm versions inside the ECS flow container (and Dask workers if external): - Prefect >= 3.x, prefect-dask matching the major/minor of Prefect - Dask/distributed installed - Incompatible versions can cause the task engine to run the function body but fail to register state. 3) Quick checks to run now - In the ECS task definition for your flow job image:
- Ensure PREFECT_API_URL and PREFECT_API_KEY are set - Ensure the image includes prefect and prefect-dask - If using an external Dask scheduler: - Verify the worker image also has prefect/prefect-dask and the same API env vars - Security groups / outbound access allow workers to reach the Prefect API host If you can share: - Are you on Prefect Cloud or self-hosted? - Are you using DaskTaskRunner with the default local cluster or passing an external address? - The logs from running the “debug_task” in both flows - Prefect/prefect-dask versions in your ECS image (output of
python -c "import prefect, prefect_dask; print(prefect.__version__, prefect_dask.__version__)"
) With those, I can point to the exact fix. Docs for reference: - Task runners: Task runners in Prefect 3 - Dask integration: Prefect Dask integration
j
@Marvin do you have the whole thread as context? i.e. can you tell me what you suggested for debug_task? Or do I have to re-enter all of it
m
thought for 21.8 seconds
No worries — you don’t need to re-enter anything. Here’s the debug_task and the two example flows I suggested:
Copy code
from prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone

@task
def debug_task(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
    <http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
    <http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
    try:
        ctx = get_run_context()
        <http://logger.info|logger.info>(f"Task run id: {ctx.task_run.id} | Flow run id: {ctx.flow_run.id}")
        # Probe Prefect API from within the task
        from prefect.client.orchestration import get_client
        async def ping():
            async with get_client() as client:
                await client.read_flow_run(ctx.flow_run.id)
        import anyio
        anyio.run(ping)
        <http://logger.info|logger.info>("Prefect API reachable from task")
    except Exception as e:
        logger.error(f"No task run context or API error: {e}")
    return x + 1
Dask runner flow:
Copy code
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(name="dask-test", task_runner=DaskTaskRunner())
def dask_flow():
    a = debug_task.submit(1)
    b = debug_task.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    dask_flow()
Local runner flow:
Copy code
from prefect import flow
from prefect.task_runners import PrefectTaskRunner

@flow(name="no-dask-test", task_runner=PrefectTaskRunner())
def local_flow():
    a = debug_task.submit(1)
    b = debug_task.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    local_flow()
Run each and check the logs: - If Dask logs “No task run context or API error”, the Dask workers aren’t reaching the Prefect API or don’t have Prefect context/env. - If Local shows tasks stuck Scheduled, compare the UTC timestamp to the UI time (clock skew), and confirm API access from inside the container.
j
@Marvin Great so I tried this, and two problems:
debug_task
raises an attribute error because the
TaskRunContext
has no attribute
flow_run
- I don't think this indicates that we can't connect, but a mistake in debug task. We get no logs from the
no-dask-test
because the tasks never run. However, the system time in the dask tasks seems to be similar to the ui time (I'm looking at the timestamp in the flow run, not sure what you mean beyond that.) I am running self-hosted in an ECS cluster. The PREFECT3_API_URL environment variable is set correctly in the tasks. I am using a local dask cluster (not passing an external url). I'm using
prefect version=3.3.7, prefect-dask version=0.3.5
. The prefect3 server is running in an elastic beanstalk app.
m
thought for 134.3 seconds
Thanks for the details — and great catch on the debug_task issue. You’re right: in a Task context there’s no
flow_run
attribute. You can get the flow run id from
ctx.task_run.flow_run_id
. Here’s a corrected probe that works in both flow and task contexts:
Copy code
from prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone

@task
def debug_task(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
    <http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
    <http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
    try:
        ctx = get_run_context()
        # Try to extract ids from either context shape
        tr_id = getattr(getattr(ctx, "task_run", None), "id", None)
        fr_id = getattr(getattr(ctx, "task_run", None), "flow_run_id", None) or getattr(getattr(ctx, "flow_run", None), "id", None)
        <http://logger.info|logger.info>(f"Context ids: task_run_id={tr_id} flow_run_id={fr_id}")

        # Probe Prefect API from within the task
        from prefect.client.orchestration import get_client
        import anyio
        async def ping():
            async with get_client() as client:
                if fr_id:
                    await client.read_flow_run(fr_id)
        anyio.run(ping)
        <http://logger.info|logger.info>("Prefect API reachable from task")
    except Exception as e:
        logger.error(f"Context/API probe failed: {e}")
    return x + 1
Now, to your two symptoms: 1) Dask flow shows no tasks, but completes - Most likely cause: the Dask worker processes don’t have Prefect API configuration, so they execute the code but cannot report task state. In that case, you’ll see no task runs in the UI even though the flow returns. - Critical detail I noticed: you mentioned setting PREFECT3_API_URL. Prefect 3 still uses the setting name PREFECT_API_URL (no “3”). If the workers only have PREFECT3_API_URL, Prefect won’t see it. - Action: - Ensure the environment contains PREFECT_API_URL (and PREFECT_API_KEY if your API requires auth), not PREFECT3_API_URL. - Since you’re using a local Dask cluster within the ECS task, the worker processes should inherit the container env — verify inside the task logs that
PREFECT_API_URL
is populated (the probe above logs it). - Verify both Prefect and prefect-dask are installed in the flow container (and thus in worker processes): versions you shared (3.3.7 / 0.3.5) are fine. 2) PrefectTaskRunner shows two tasks Scheduled but they never start (and no task logs) - This typically means the engine created the task runs in the API but the execution loop is not moving them to Running. Common causes: - API URL misconfigured (e.g., missing “/api” path on self-hosted) so follow-up API calls 404/timeout - Network or TLS issues between the container and your Elastic Beanstalk API (look for 401/403/404/5xx in server logs) - Significant clock skew (less common if your timestamps look aligned) - Actions to pinpoint: - Make sure your PREFECT_API_URL includes the /api path for OSS (e.g., https://your-eb-domain/api) — this is a frequent gotcha. - Run the probe above with PrefectTaskRunner and confirm: -
PREFECT_API_URL
is what you expect - The API “ping” succeeds from within the task - Try an inline task call (bypasses the submit/scheduler path):
Copy code
@task
    def inc(x): return x + 1

    @flow(task_runner=PrefectTaskRunner())
    def inline_test():
        return inc(1) + inc(2)

    if __name__ == "__main__":
        inline_test()
- If inline works and creates/finishes task runs, but
.submit()
stays Scheduled, the issue is with the runner’s execution loop communicating with your API. - Turn up logging in the container: - Set
PREFECT_LOGGING_LEVEL=DEBUG
- Optionally
PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask
- Check your Prefect server logs in EB for errors around task-run endpoints when you start the flow. Next, could you share these from a rerun with the corrected probe? - The debug logs from the Dask flow: specifically the
PREFECT_API_URL
line and whether “Prefect API reachable from task” appears - The same from the PrefectTaskRunner flow - Your exact
PREFECT_API_URL
value (redact host if needed, but include whether it ends with /api) Relevant docs: - Runtime context: Runtime context - Dask integration: Prefect Dask integration - Settings reference: Prefect 3 settings I strongly suspect renaming PREFECT3_API_URL to PREFECT_API_URL (and ensuring it includes /api) will resolve both issues; the probe will confirm quickly.
j
@Marvin sorry, I mistyped. I have in the logs 011034 PM prefect.task_runs PREFECT_API_URL=https://my-domain.com/api
m
thought for 162.2 seconds
j
@Marvin my-domain.com is fake, just censoring the real address
m
thought for 73.3 seconds
All good on the redaction! Based on your latest details, I think we’ve found the core issue: - Prefect 3 expects the env var name PREFECT_API_URL (not PREFECT3_API_URL), and for OSS it must include the /api suffix. If your containers only have PREFECT3_API_URL set, Prefect won’t pick it up. That explains: - Dask: tasks run but never create/report task runs (workers don’t have a valid API target) - PrefectTaskRunner: task runs appear as Scheduled but never move to Running/Completed (engine can’t update state) Two fixes to apply now 1) In your ECS task definition (flow container), set: - PREFECT_API_URL=https://my-domain.com/api - PREFECT_API_KEY=... (only if you’ve enabled auth; OSS default is usually no key) 2) Re-run with the corrected probe task (fixes the context attribute use):
Copy code
from prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone

@task
def debug_task(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
    <http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
    <http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
    try:
        ctx = get_run_context()
        tr_id = getattr(getattr(ctx, "task_run", None), "id", None)
        fr_id = getattr(getattr(ctx, "task_run", None), "flow_run_id", None) or getattr(getattr(ctx, "flow_run", None), "id", None)
        <http://logger.info|logger.info>(f"Context ids: task_run_id={tr_id} flow_run_id={fr_id}")

        from prefect.client.orchestration import get_client
        import anyio
        async def ping():
            async with get_client() as client:
                if fr_id:
                    await client.read_flow_run(fr_id)
        anyio.run(ping)
        <http://logger.info|logger.info>("Prefect API reachable from task")
    except Exception as e:
        logger.error(f"Context/API probe failed: {e}")
    return x + 1
Then run both flows again: - Dask runner (local cluster in ECS) - PrefectTaskRunner Expected outcome after fixing PREFECT_API_URL: - Dask: you should now see task runs in the UI - PrefectTaskRunner: tasks should move from Scheduled to Running/Completed, and you’ll see “Prefect API reachable from task” in logs If anything still sticks: - Please share the exact value you’re using for PREFECT_API_URL (confirm it ends with /api), and any error from the probe logs - Verify your ECS security group/NACL allows outbound to your EB domain/port - Check your EB server logs for 404s or 401s on /api paths References: - Settings reference: Prefect 3 settings - Runtime context: Runtime context - Dask integration: Prefect Dask integration
Thanks — that log confirms you’re using the correct
PREFECT_API_URL
format. Two concrete next steps to pinpoint what’s blocking execution/state updates: 1) Use the corrected debug task and confirm API access from inside the Dask workers Please re-run the Dask flow with this updated probe (note the
flow_run_id
access is via
ctx.task_run.flow_run_id
):
Copy code
from prefect import flow, task, get_run_context
from prefect.logging import get_run_logger
import os, sys, platform
from datetime import datetime, timezone

@task
def debug_task(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"UTC now: {datetime.now(timezone.utc).isoformat()}")
    <http://logger.info|logger.info>(f"PREFECT_API_URL={os.getenv('PREFECT_API_URL')}")
    <http://logger.info|logger.info>(f"Python={sys.version} Host={platform.node()}")
    try:
        ctx = get_run_context()
        tr_id = getattr(getattr(ctx, "task_run", None), "id", None)
        fr_id = getattr(getattr(ctx, "task_run", None), "flow_run_id", None) or getattr(getattr(ctx, "flow_run", None), "id", None)
        <http://logger.info|logger.info>(f"Context ids: task_run_id={tr_id} flow_run_id={fr_id}")
        from prefect.client.orchestration import get_client
        import anyio
        async def ping():
            async with get_client() as client:
                if fr_id:
                    await client.read_flow_run(fr_id)
        anyio.run(ping)
        <http://logger.info|logger.info>("Prefect API reachable from task")
    except Exception as e:
        logger.error(f"Context/API probe failed: {e}")
    return x + 1
Run:
Copy code
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(name="dask-test", task_runner=DaskTaskRunner())
def dask_flow():
    a = debug_task.submit(1)
    b = debug_task.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    dask_flow()
- If you see “Context ids…” and “Prefect API reachable from task” in the logs, Dask workers do have Prefect context and can reach your API. If you still don’t see task runs in the UI, it points to state update calls failing (see #2). - If you don’t see those lines, the Dask workers aren’t talking to the API; we’ll need to ensure the env and networking are available inside worker processes. 2) Check for PATCH being blocked at your Elastic Beanstalk/ALB layer Your “Scheduled but never runs” symptom with
PrefectTaskRunner
strongly suggests the API allows create/read (POST/GET) but blocks state transitions (PATCH). Prefect uses PATCH for moving runs to Running/Completed. If PATCH is blocked or altered by the proxy, tasks remain Scheduled in the UI and never show logs. Actions: - Check EB/ALB config and any WAF/reverse proxy in front of your
/api
to ensure HTTP methods GET, POST, PATCH, DELETE are allowed and forwarded. - Look at your Prefect Server logs when a flow starts; if PATCH is blocked you’ll typically see 405/403/404/5xx errors on endpoints like
/api/task_runs/{id}/state
or
/api/flow_runs/{id}/state
. - Turn up client logs in the ECS container: -
PREFECT_LOGGING_LEVEL=DEBUG
- Optionally
PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask
- Quick flow-level API probe (runs even if tasks won’t start):
Copy code
from prefect import flow, get_run_context
from prefect.logging import get_run_logger
from prefect.client.orchestration import get_client
import anyio

@flow(name="api-ping")
def api_ping():
    logger = get_run_logger()
    ctx = get_run_context()
    <http://logger.info|logger.info>(f"Flow run id: {ctx.flow_run.id}")
    async def ping():
        async with get_client() as client:
            await client.read_flow_run(ctx.flow_run.id)
    anyio.run(ping)
    <http://logger.info|logger.info>("Flow can reach API")

if __name__ == "__main__":
    api_ping()
If this succeeds but tasks still stick in Scheduled, it’s almost certainly method filtering on PATCH. A few more checks - Versions: you’re on Prefect 3.3.7 and prefect-dask 0.3.5; consider upgrading server and containers to the latest 3.4.x to pick up recent runner fixes.
- Confirm your Prefect Server version (UI footer or GET /api/version) roughly matches the client minor version. - Since you’re using a local Dask cluster in the same ECS task, worker processes should inherit env. The debug task’s “PREFECT_API_URL=” line will confirm. If you can share: - The Dask-flow logs for “Context ids…” and “Prefect API reachable from task” - Any error lines from your EB server logs around task state endpoints - Your server version …we can zero in on a precise fix. My bet is on a proxy blocking PATCH.
j
@Marvin the api is reachable from the dask endpoint (demonstrated by the
debug_task
, and I see many successful PATCH requests in the nginx access logs on the EBS server. What now?
m
thought for 121.5 seconds
Got it — thanks for confirming the API is reachable from the Dask workers and your nginx shows successful PATCH requests. Since Dask tasks can reach the API but you still don’t see task runs, and your PrefectTaskRunner tasks remain “Scheduled,” let’s zero in on version compatibility and environment consistency, then isolate infra vs code. High‑signal checks to run next 1) Confirm your server version is Prefect 3.x Prefect 3 SDK is not compatible with a Prefect 2.x server. A common symptom with version mismatches is “tasks stuck in Scheduled” or tasks not appearing for parallel runners. - From your browser or ECS container:
Copy code
curl -s <https://my-domain.com/api/version>
Please paste the JSON. If it reports a 2.x server, upgrade to Prefect 3 OSS server and re-test. If it’s 3.x but older than your SDK, consider upgrading both server and SDK to the latest 3.4.x. 2) Verify you’re looking at the same backend the flows write to Since
debug_task
logs show a concrete
task_run_id
, let’s confirm that record exists in your DB. Grab one
task_run_id
from the Dask flow logs and query your Prefect DB directly: - If it’s there, the UI should be able to display it — this points to a UI/view/filtering or workspace mismatch. - If it’s not there, your flow may be writing to a different API/database than the UI you’re checking. 3) Try the same flows outside ECS against the same API Run the two small flows (PrefectTaskRunner and DaskTaskRunner) from your laptop but pointing at your EB API: - Set
PREFECT_API_URL=<https://my-domain.com/api>
locally - Run the flows If tasks appear locally but not from ECS, it’s an infra/network/pool issue in ECS. If they fail locally too, it’s server-side config/version. 4) Turn on Prefect DEBUG logs for the ECS run and look for state update errors Set these env vars on the ECS task and re-run: -
PREFECT_LOGGING_LEVEL=DEBUG
-
PREFECT_LOGGING_EXTRA_LOGGERS=distributed,dask
Then share any lines that mention
set_task_run_state
,
set_flow_run_state
, or
httpx
errors. Even with nginx 200s, the client may be retrying/timeout-ing at times (connection pool, keep-alive, proxy limits). 5) Try a different runner to triangulate (ThreadPool) This will tell us if the issue is specific to Dask or more general in your ECS environment.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task(log_prints=True)
def inc(x):
    print(f"inc({x})")
    return x + 1

@flow(name="threadpool-test", task_runner=ThreadPoolTaskRunner())
def threadpool_flow():
    a = inc.submit(1)
    b = inc.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    threadpool_flow()
- If ThreadPool shows proper task runs and completes, the Dask integration/config is suspect. - If ThreadPool also sticks in Scheduled, the issue is not Dask. 6) Upgrade to latest 3.4.x You’re on Prefect 3.3.7 and prefect-dask 0.3.5. There have been runner and engine fixes since then. If feasible, upgrade both the server and ECS container images to 3.4.x and the latest prefect-dask, then re-test the two sample flows. What would help me help you fastest - The JSON output of GET https://my-domain.com/api/version - Whether the
task_run_id
from
debug_task
appears in your DB - The DEBUG logs from the PrefectTaskRunner flow showing where it gets stuck - Results of the ThreadPool test flow Useful docs - Task runners: Task runners in Prefect 3 - Dask integration: Prefect Dask integration - Settings: Prefect 3 settings Given what you’ve confirmed so far, my top suspicion is a version mismatch between SDK and server or a subtle infra difference between where your flows run and where your UI/DB is pointing. The checks above will pinpoint which.
j
@Marvin could you elaborate on how to run the flows locally? If I use the
prefect
command with the
PREFECT_API_URL
pointed at my server, won't the flows run in ECS?
m
thought for 66.2 seconds
Great question. Short answer: to run your flow locally while still reporting to your remote server, don’t use a deployment or
prefect deployment run
. Just execute your Python script directly with
PREFECT_API_URL
set. Key points - The
prefect
CLI does not have a generic “run this flow locally” command in 3.x. If you use
prefect deploy
and
prefect deployment run
, your flow run will wait for a worker and, in your setup, execute in ECS. - To run locally, invoke the flow function directly in a Python process. Prefect will still create/update runs in your remote server if
PREFECT_API_URL
(and
PREFECT_API_KEY
if required) are set in your environment. How to run locally against your server 1) Save a small script, e.g. local_test.py:
Copy code
from prefect import flow, task
from prefect.task_runners import PrefectTaskRunner
from prefect_dask.task_runners import DaskTaskRunner

@task
def inc(x):
    return x + 1

@flow(name="local-no-dask", task_runner=PrefectTaskRunner())
def local_no_dask():
    a = inc.submit(1)
    b = inc.submit(2)
    return a.result() + b.result()

@flow(name="local-dask", task_runner=DaskTaskRunner())
def local_dask():
    a = inc.submit(1)
    b = inc.submit(2)
    return a.result() + b.result()

if __name__ == "__main__":
    # choose one to test
    local_no_dask()
    # local_dask()
2) Run it locally with your server configured: - If OSS server with no auth:
Copy code
PREFECT_API_URL=<https://my-domain.com/api> python local_test.py
- If auth enabled:
Copy code
PREFECT_API_URL=<https://my-domain.com/api> PREFECT_API_KEY=**** python local_test.py
That will: - Execute the flow on your local machine (not ECS) - Report the flow/task runs to your self-hosted server so you can see them in the UI Notes - Do not use
prefect deployment run
for this test; that will route to your ECS worker. - You can also test Dask locally by switching to `local_dask()`; this spins up a local Dask cluster inside your process. - If you want to ensure the right extras are installed, you can run:
Copy code
uv run --with 'prefect[dask]' python local_test.py
Docs for reference: - Task runners: Task runners in Prefect 3 - Dask integration: Prefect Dask integration
j
@Marvin: 1. The curl command returns 0.8.4. Perhaps that's not the prefect version, but the API version? In the EBS server running
prefect --version
returns 3.3.7. 2. There is no row in the database
task_run
table with the id from the logs. The only rows in that table are in the scheduled or cancelled state, suggesting that they are from the
no_dask_example
flow where the tasks stalled. 3. Running locally as described produced no task runs, but did give an interesting warning:
Copy code
15:37:09.992 | WARNING | prefect.events.clients - Unable to connect to '<wss://my-domain.com/api/events/in>'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: server rejected WebSocket connection: HTTP 404. Set PREFECT_DEBUG_MODE=1 to see the full error.
m
thought for 85.7 seconds
Thanks for the three data points — they’re exactly what we needed. What this tells us - 0.8.4 from /api/version is the API version. Your server Prefect version (3.3.7) is fine. - No row in task_run for the ID emitted by your Dask task + the local warning about websockets failing with 404 is the smoking gun. - In Prefect 3, task/flow runs are recorded via the events pipeline. The client opens a WebSocket to /api/events/in and streams events; the server’s “task run recorder” service consumes those to create/update task runs. If that WebSocket cannot connect, you’ll see exactly what you’re seeing: flows run, but task runs never show up in the DB/UI. So the root cause is: your reverse proxy (Elastic Beanstalk/nginx and/or ALB) is not routing WebSocket upgrades to /api/events/in — hence the 404 on wss://my-domain.com/api/events/in. What to change - Ensure WebSocket upgrade for the /api/events/in path is allowed and proxied to your Prefect server process. - Typical nginx location block (adapt to your EB backend/upstream):
Copy code
# Proxy WebSocket for Prefect events ingestion
location /api/events/in {
    proxy_pass <http://127.0.0.1:4200/api/events/in;>  # or your internal upstream
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "Upgrade";
    proxy_set_header Host $host;
    proxy_read_timeout 600s;
    proxy_send_timeout 600s;
}
- If you have an ALB in front, ALB supports WebSockets — just ensure the listener/target group forwards to your nginx, and nothing strips Upgrade/Connection headers. - If there’s CloudFront in front, be sure the behavior allows WebSockets and doesn’t block the /api/events/in path. How to verify the fix - From your laptop: use a simple ws client (e.g. wscat) to check the handshake returns 101 Switching Protocols, not 404: -
wscat -c <wss://my-domain.com/api/events/in>
(it may immediately close after connect — that’s fine; we just want to see the 101). - Re-run your small local flow with PREFECT_API_URL=https://my-domain.com/api and confirm that task runs appear. - Re-run your ECS flows; Dask task runs should now be recorded as well. Also confirm server services are running - Prefect server should start with services enabled (default). Ensure you are not using
prefect server start --no-services
. - On server start, logs should mention starting services like “event persister” and “task run recorder”. Why the PrefectTaskRunner tasks were stuck Scheduled - When events aren’t reaching the server, the engine/DB can get out of sync on run lifecycle. Restoring the events WebSocket typically resolves Scheduled runs that never progress. - If anything still sticks after fixing WebSockets, enable DEBUG logs and check for state transition errors: -
PREFECT_LOGGING_LEVEL=DEBUG
- Look for calls to set_task_run_state / set_flow_run_state in the flow logs and corresponding responses in server logs. Once the WebSocket route is fixed, I expect: - The local test to show task runs - The Dask/ECS test to show task runs - The PrefectTaskRunner case to move past Scheduled Helpful references: - Settings reference: Prefect 3 settings - Task runners: Task runners in Prefect 3 - Dask integration: Prefect Dask integration If you can share your current EB/nginx config for the /api path (or confirm the change), I can sanity-check it.