Nick Torba
07/14/2025, 9:18 PMset_row_count
I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
17:13:48.835 | INFO | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
17:13:48.836 | INFO | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally.
It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS)
My question is, has anyone else run into the problem of a task hanging indefinitely from certain places?Nick Torba
07/15/2025, 11:36 AMMarvin
07/15/2025, 11:36 AMMarvin
07/15/2025, 11:36 AMNick Torba
07/15/2025, 11:42 AMset_row_count
I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
17:13:48.835 | INFO | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
17:13:48.836 | INFO | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally.
It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS)
when I call my set_row_count task from just my dev environment, it just hangs. it never actually gets called.
the same task from other flows worksNick Torba
07/15/2025, 11:43 AMNick Torba
07/15/2025, 11:48 AMset_row_count
I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
17:13:48.835 | INFO | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
17:13:48.836 | INFO | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally.
It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS)
when I call my set_row_count task from just my dev environment, it just hangs. it never actually gets called.
the same task from other flows works
[7:43 AM]
one last thing, I don't actually see the task get created in the prefect task graph. My flow just never stopsMarvin
07/15/2025, 11:48 AMMarvin
07/15/2025, 11:50 AMset_row_count
task
python
@task
def set_row_count():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting set_row_count task")
<http://logger.info|logger.info>("Attempting database connection")
# Your existing code
<http://logger.info|logger.info>("Database connection successful")
# Rest of your code
b. Check if you can see these logs in your Prefect UI or logs when the task hangs
c. Monitor your Postgres connections when this occurs:
sql
SELECT * FROM pg_stat_activity
WHERE datname = 'your_database_name';
4. Task Worker Configuration
- Verify that your task worker configuration is consistent across environments
- Check if there are any resource constraints (memory, CPU) in your dev environment
5. Transaction Management
- If your task involves database transactions, ensure proper transaction management
- Consider adding explicit transaction handling if not already present
Since you mentioned the task doesn't appear in the task graph, this could indicate that the task is not being properly registered or there might be an issue with the flow's task dependencies.
Would you be able to share:
1. A simplified version of your set_row_count
task code?
2. The flow configuration where it's failing?
3. Any error messages from your Postgres logs when this occurs?
This would help narrow down the exact cause of the hanging behavior.Nick Torba
07/15/2025, 1:56 PMMarvin
07/15/2025, 1:56 PMMarvin
07/15/2025, 1:58 PMtask.submit()
instead of direct calling if you're calling tasks within other tasks
2. Task Runner Configuration
- Since this only happens in your remote dev environment, check your task runner configuration
- Verify that your task runner has enough capacity to handle concurrent tasks
- Check if there are any resource constraints in your dev environment
To help diagnose this, could you share:
1. How are you calling this task in your flow? For example:
python
@flow
def your_flow():
# How is set_row_count called here?
result = set_row_count()
2. Are there any differences in how the task is called between the working flows and the non-working flow?
3. Can you check the Prefect UI for this specific flow run and verify:
- Are there any other tasks running concurrently?
- What is the state of any upstream tasks?
- Are there any error messages in the flow run logs?
Also, as a quick test, you could try:
1. Adding a simple logging statement at the very start of your task
2. Running the task with a different name in the same flow to see if it's task-specific or flow-specific
python
@task(name="test_row_count")
def test_row_count():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting test row count task")
# Rest of your original task code
This would help determine if it's specific to the task implementation or something in the environment/configuration.Nick Torba
07/15/2025, 2:38 PMMarvin
07/15/2025, 2:38 PMMarvin
07/15/2025, 2:39 PMGET /api/task_runs/{task_run_id}
This will give you detailed information about the task run, including its current state and any state transition history.
2. Get Task Run History:
GET /api/task_runs/{task_run_id}/history
This endpoint shows the full history of state transitions for the task run.
3. Get Flow Run Details (to see context of the task):
GET /api/flow_runs/{flow_run_id}
This can show you the overall flow context and any potential dependency issues.
You can access these endpoints in several ways:
1. Using the Python Client:
python
from prefect.client import get_client
async with get_client() as client:
# Get task run details
task_run = await client.read_task_run(task_run_id="your-task-run-id")
print(f"Current State: {task_run.state}")
# Get state history
history = await client.read_task_run_states(task_run_id="your-task-run-id")
for state in history:
print(f"State: {state.type}, Timestamp: {state.timestamp}")
2. Direct HTTP Requests:
python
import httpx
# Assuming you have your API URL and key configured
PREFECT_API_URL = "<http://your-prefect-server/api>"
headers = {"Authorization": "Bearer your-api-key"}
# Get task run details
response = httpx.get(
f"{PREFECT_API_URL}/task_runs/your-task-run-id",
headers=headers
)
task_run = response.json()
# Get state history
response = httpx.get(
f"{PREFECT_API_URL}/task_runs/your-task-run-id/history",
headers=headers
)
state_history = response.json()
3. Using the Prefect CLI:
bash
prefect task-run inspect <task-run-id>
The key things to look for in the responses:
- Current state details including any data stored in the state
- Timestamps of state transitions
- Any error messages or state results
- Dependencies and upstream task states
- Worker assignment information
Would you like me to provide more specific details about any of these endpoints or help construct a specific query for your use case?Nick Torba
07/15/2025, 2:40 PMMarvin
07/15/2025, 2:40 PMMarvin
07/15/2025, 2:40 PMbash
prefect work-pool ls
Then check the specific work pool's workers:
bash
prefect worker status --work-pool "your-work-pool-name"
2. API Endpoints to Check:
python
import httpx
from prefect.client import get_client
async def investigate_pending_task(task_run_id, flow_run_id):
async with get_client() as client:
# 1. Get task run details
task_run = await client.read_task_run(task_run_id)
# 2. Get work pool details for the flow run
flow_run = await client.read_flow_run(flow_run_id)
# 3. Get work pool queue status
if flow_run.work_pool_name:
work_pool = await client.read_work_pool(flow_run.work_pool_name)
work_pool_status = await client.read_work_pool_status(flow_run.work_pool_name)
return {
"task_state": task_run.state,
"flow_run_status": flow_run.state,
"work_pool_info": work_pool if flow_run.work_pool_name else None,
"work_pool_status": work_pool_status if flow_run.work_pool_name else None
}
3. Common Causes for Tasks Stuck in PENDING:
- No active workers for the work pool
- Worker process is running but not properly configured
- Network connectivity issues between worker and Prefect server
- Resource constraints preventing worker from picking up new tasks
- Misconfigured work pool settings
To help diagnose your specific case:
1. Can you share:
- What work pool type are you using?
- Are you seeing any workers when you run prefect work-pool ls
?
- Is this happening for all tasks or just this specific one?
2. Try these diagnostic steps:
python
from prefect.client import get_client
async with get_client() as client:
# Check if the task is actually queued
task_run = await client.read_task_run(task_run_id="your-task-run-id")
print(f"Task Run State: {task_run.state}")
# Check the flow run's work pool assignment
flow_run = await client.read_flow_run(flow_run_id="your-flow-run-id")
print(f"Work Pool: {flow_run.work_pool_name}")
if flow_run.work_pool_name:
# Check work pool status
status = await client.read_work_pool_status(flow_run.work_pool_name)
print(f"Work Pool Status: {status}")
3. You can also check the worker logs directly:
bash
prefect worker start --work-pool "your-work-pool-name" --log-level DEBUG
This will show detailed logs about task scheduling and pickup.
Would you like me to provide more specific guidance based on your work pool type or deployment configuration?