Hello friends. I have found a bug I am really str...
# ask-community
n
Hello friends. I have found a bug I am really struggling with. I have a task named
set_row_count
I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
Copy code
17:13:48.835 | INFO    | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
17:13:48.836 | INFO    | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally. It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS) My question is, has anyone else run into the problem of a task hanging indefinitely from certain places?
@Marvin have you seen anything like this?
m
thought for 4.2 seconds
I need more context to help you effectively. What specifically are you asking about? Are you: 1. Encountering an error message? 2. Looking for information about a particular Prefect feature? 3. Seeing unexpected behavior in your flows? Please provide more details about what you're seeing or experiencing, and I'll be happy to help investigate. If there's an error message or specific behavior you're observing, sharing that would be especially helpful.
n
I have a task named
set_row_count
I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
Copy code
17:13:48.835 | INFO    | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
17:13:48.836 | INFO    | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally. It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS) when I call my set_row_count task from just my dev environment, it just hangs. it never actually gets called. the same task from other flows works
one last thing, I don't actually see the task get created in the prefect task graph. My flow just never stops
@Marvin I have a task named
set_row_count
I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
Copy code
17:13:48.835 | INFO    | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
17:13:48.836 | INFO    | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally. It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS) when I call my set_row_count task from just my dev environment, it just hangs. it never actually gets called. the same task from other flows works [7:43 AM] one last thing, I don't actually see the task get created in the prefect task graph. My flow just never stops
m
thought for 69.2 seconds
Based on your description and my research, there are several potential causes for this behavior. The fact that it only occurs in your remote development environment and works elsewhere (including local and ECS) suggests this is likely an environment-specific issue. Here are the most likely causes and troubleshooting steps: 1. Database Connection Issues - Since you mentioned Postgres runs remotely, there might be connection pool exhaustion or timeout issues specific to your dev environment - The task might be waiting for a database connection that never becomes available - Check your Postgres connection pool settings and active connections when this occurs 2. Environment Configuration - Since the task works in other environments but not in this specific one, compare the environment variables and configurations between your working and non-working environments - Pay special attention to: - Database connection strings - Network configurations - Resource limits 3. Debugging Steps: a. Add detailed logging before and after the database operations in your
set_row_count
task
Copy code
python
   @task
   def set_row_count():
       logger = get_run_logger()
       <http://logger.info|logger.info>("Starting set_row_count task")
       <http://logger.info|logger.info>("Attempting database connection")
       # Your existing code
       <http://logger.info|logger.info>("Database connection successful")
       # Rest of your code
b. Check if you can see these logs in your Prefect UI or logs when the task hangs c. Monitor your Postgres connections when this occurs:
Copy code
sql
   SELECT * FROM pg_stat_activity 
   WHERE datname = 'your_database_name';
4. Task Worker Configuration - Verify that your task worker configuration is consistent across environments - Check if there are any resource constraints (memory, CPU) in your dev environment 5. Transaction Management - If your task involves database transactions, ensure proper transaction management - Consider adding explicit transaction handling if not already present Since you mentioned the task doesn't appear in the task graph, this could indicate that the task is not being properly registered or there might be an issue with the flow's task dependencies. Would you be able to share: 1. A simplified version of your
set_row_count
task code? 2. The flow configuration where it's failing? 3. Any error messages from your Postgres logs when this occurs? This would help narrow down the exact cause of the hanging behavior.
n
@Marvin The issue seems to be that this task gets fully stuck in a PENDING state
m
thought for 68.4 seconds
Given that the task is stuck in PENDING state and never transitions to RUNNING, here are the most likely causes and solutions: 1. Task Dependencies - Check if there are any implicit or explicit dependencies that aren't being met - Verify that all upstream tasks (if any) are completing successfully - Use
task.submit()
instead of direct calling if you're calling tasks within other tasks 2. Task Runner Configuration - Since this only happens in your remote dev environment, check your task runner configuration - Verify that your task runner has enough capacity to handle concurrent tasks - Check if there are any resource constraints in your dev environment To help diagnose this, could you share: 1. How are you calling this task in your flow? For example:
Copy code
python
   @flow
   def your_flow():
       # How is set_row_count called here?
       result = set_row_count()
2. Are there any differences in how the task is called between the working flows and the non-working flow? 3. Can you check the Prefect UI for this specific flow run and verify: - Are there any other tasks running concurrently? - What is the state of any upstream tasks? - Are there any error messages in the flow run logs? Also, as a quick test, you could try: 1. Adding a simple logging statement at the very start of your task 2. Running the task with a different name in the same flow to see if it's task-specific or flow-specific
Copy code
python
@task(name="test_row_count")
def test_row_count():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting test row count task")
    # Rest of your original task code
This would help determine if it's specific to the task implementation or something in the environment/configuration.
n
@Marvin I see that the task actually does get created, but the state just stays in pending. which api endpoints can I hit to understand why that task is not transitioning to further states?
m
thought for 54.8 seconds
Here are the key API endpoints you can use to investigate the task state: 1. Get Task Run Details:
Copy code
GET /api/task_runs/{task_run_id}
This will give you detailed information about the task run, including its current state and any state transition history. 2. Get Task Run History:
Copy code
GET /api/task_runs/{task_run_id}/history
This endpoint shows the full history of state transitions for the task run. 3. Get Flow Run Details (to see context of the task):
Copy code
GET /api/flow_runs/{flow_run_id}
This can show you the overall flow context and any potential dependency issues. You can access these endpoints in several ways: 1. Using the Python Client:
Copy code
python
from prefect.client import get_client

async with get_client() as client:
    # Get task run details
    task_run = await client.read_task_run(task_run_id="your-task-run-id")
    print(f"Current State: {task_run.state}")
    
    # Get state history
    history = await client.read_task_run_states(task_run_id="your-task-run-id")
    for state in history:
        print(f"State: {state.type}, Timestamp: {state.timestamp}")
2. Direct HTTP Requests:
Copy code
python
import httpx

# Assuming you have your API URL and key configured
PREFECT_API_URL = "<http://your-prefect-server/api>"
headers = {"Authorization": "Bearer your-api-key"}

# Get task run details
response = httpx.get(
    f"{PREFECT_API_URL}/task_runs/your-task-run-id",
    headers=headers
)
task_run = response.json()

# Get state history
response = httpx.get(
    f"{PREFECT_API_URL}/task_runs/your-task-run-id/history",
    headers=headers
)
state_history = response.json()
3. Using the Prefect CLI:
Copy code
bash
prefect task-run inspect <task-run-id>
The key things to look for in the responses: - Current state details including any data stored in the state - Timestamps of state transitions - Any error messages or state results - Dependencies and upstream task states - Worker assignment information Would you like me to provide more specific details about any of these endpoints or help construct a specific query for your use case?
n
@Marvin What if there are no state transitions? It is just stuck in pending
m
thought for 35.9 seconds
To investigate a task stuck in PENDING state without transitions, you should check: 1. Work Pool and Worker Status First, check if there are any active workers for your work pool:
Copy code
bash
prefect work-pool ls
Then check the specific work pool's workers:
Copy code
bash
prefect worker status --work-pool "your-work-pool-name"
2. API Endpoints to Check:
Copy code
python
import httpx
from prefect.client import get_client

async def investigate_pending_task(task_run_id, flow_run_id):
    async with get_client() as client:
        # 1. Get task run details
        task_run = await client.read_task_run(task_run_id)
        
        # 2. Get work pool details for the flow run
        flow_run = await client.read_flow_run(flow_run_id)
        
        # 3. Get work pool queue status
        if flow_run.work_pool_name:
            work_pool = await client.read_work_pool(flow_run.work_pool_name)
            work_pool_status = await client.read_work_pool_status(flow_run.work_pool_name)
            
        return {
            "task_state": task_run.state,
            "flow_run_status": flow_run.state,
            "work_pool_info": work_pool if flow_run.work_pool_name else None,
            "work_pool_status": work_pool_status if flow_run.work_pool_name else None
        }
3. Common Causes for Tasks Stuck in PENDING: - No active workers for the work pool - Worker process is running but not properly configured - Network connectivity issues between worker and Prefect server - Resource constraints preventing worker from picking up new tasks - Misconfigured work pool settings To help diagnose your specific case: 1. Can you share: - What work pool type are you using? - Are you seeing any workers when you run
prefect work-pool ls
? - Is this happening for all tasks or just this specific one? 2. Try these diagnostic steps:
Copy code
python
   from prefect.client import get_client
   
   async with get_client() as client:
       # Check if the task is actually queued
       task_run = await client.read_task_run(task_run_id="your-task-run-id")
       print(f"Task Run State: {task_run.state}")
       
       # Check the flow run's work pool assignment
       flow_run = await client.read_flow_run(flow_run_id="your-flow-run-id")
       print(f"Work Pool: {flow_run.work_pool_name}")
       
       if flow_run.work_pool_name:
           # Check work pool status
           status = await client.read_work_pool_status(flow_run.work_pool_name)
           print(f"Work Pool Status: {status}")
3. You can also check the worker logs directly:
Copy code
bash
   prefect worker start --work-pool "your-work-pool-name" --log-level DEBUG
This will show detailed logs about task scheduling and pickup. Would you like me to provide more specific guidance based on your work pool type or deployment configuration?