<@ULVA73B9P> Hi, Im experiencing a problem where m...
# marvin-in-the-wild
p
@Marvin Hi, Im experiencing a problem where my Flow in a "Running" state, and Task is stuck in “Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments. Also, when i try to do the same process using “local Prefect server instance”, the task work/runs perfectly without any issues.
m
Your message was too long by 151 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi,
I am experiencing a problem where my Flow remains in a "Running" state, and my Task is stuck in a "Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments. Please refer to the sample code below. from prefect import flow, task, serve, get_client deployment_name = 'deployment' no_of_deployments = 8 def single_run(doc): # do something # each prefect task runs concurrently where # it gets input from flow def create_task_run_inside_flow(doc): return single_run(doc) @flow(log_prints=True) def pipeline(dict_docs: dict, task_name : dict, tag_no : int): for doc, task_name in zip(dict_docs, task_fname): task_run = task( create_task_run_inside_flow, tags=[f"files_{str(tag_no)}"], task_run_name=task_fname, retries=1, retry_delay_seconds=10, timeout_seconds=60 * 60, ) task_run.submit(doc) def run(): deployments = [ pipeline.to_deployment(name=f"{deployment_name}_{i}") for i in range(1, no_of_deployments + 1) ] # serve all the deployment flows serve
Your message was too long by 143 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi,
Im experiencing a problem where my Flow in a "Running" state, and Task is stuck in “Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments. sample code below: from prefect import flow, task, serve, get_client deployment_name = 'deployment' no_of_deployments = 8 def single_run(doc): # do something # each prefect task runs concurrently where # it gets input from flow def create_task_run_inside_flow(doc): return single_run(doc) @flow(log_prints=True) def pipeline(dict_docs: dict, task_name : dict, tag_no : int): for doc, task_name in zip(dict_docs, task_fname): task_run = task( create_task_run_inside_flow, tags=[f"files_{str(tag_no)}"], task_run_name=task_fname, retries=1, retry_delay_seconds=10, timeout_seconds=60 * 60, ) task_run.submit(doc) def run(): deployments = [ pipeline.to_deployment(name=f"{deployment_name}_{i}") for i in range(1, no_of_deployments + 1) ] # serve all the deployment flows serve(*deployments) if name
Your message was too long by 128 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi,
Im experiencing a problem where my Flow in a "Running" state, and Task is stuck in “Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments. sample code below: from prefect import flow, task, serve, get_client deployment_name = 'deployment' no_of_deployments = 8 def single_run(doc): # do something def create_task_run_inside_flow(doc): return single_run(doc) @flow(log_prints=True) def pipeline(dict_docs: dict, task_name : dict, tag_no : int): for doc, task_name in zip(dict_docs, task_fname): task_run = task( create_task_run_inside_flow, tags=[f"files_{str(tag_no)}"], task_run_name=task_fname, retries=1, retry_delay_seconds=10, timeout_seconds=60 * 60, ) task_run.submit(doc) def run(): deployments = [ pipeline.to_deployment(name=f"{deployment_name}_{i}") for i in range(1, no_of_deployments + 1) ] # serve all the deployment flows serve(*deployments) if name == "__main__": run() # Call run_flow function asychron
How delightful. It appears we have encountered an error while trying to obtain relevant documentation. However, let's not despair just yet. You mentioned that everything works fine with a local Prefect server instance but fails with your self-hosted setup. Considering your description, here are a few common troubleshooting steps: 1. Check Infrastructure: Ensure that your self-hosted infrastructure (servers, network, etc.) mirrors your local setup as closely as possible. 2. Concurrency Configuration: Verify that your task concurrency limits are correctly set in your self-hosted environment. Prefect allows you to set task concurrency limits globally or for individual tasks. 3. Logs and Exception Handling: Inspect the logs of your Flow runs and tasks to see if there are any clear exceptions or errors. 4. Prefect Versions: Ensure that both the client and server are on compatible versions. To assist you better, could you specify: 1. If there are any error messages in the logs? 2. Have you configured task concurrency limits either globally or at the task level?