Hi, I am experiencing a problem where my Flow r...
# ask-community
p
Hi, I am experiencing a problem where my Flow remains in a "Running" state, and my Task is stuck in a "Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments. Please refer to the sample code below. _from prefect import flow, task, serve, get_client deployment_name = 'deployment' no_of_deployments = 8 def single_run(doc): # do something # each prefect task runs concurrently where # it gets input from flow def create_task_run_inside_flow(doc): return single_run(doc) @flow(log_prints=True) def pipeline(dict_docs: dict, task_name : dict, tag_no : int): for doc, task_name in zip(dict_docs, task_fname): task_run = task( create_task_run_inside_flow, tags=[f"files_{str(tag_no)}"], task_run_name=task_fname, retries=1, retry_delay_seconds=10, timeout_seconds=60 * 60, ) task_run.submit(doc) def run(): deployments = [ pipeline.to_deployment(name=f"{deployment_name}_{i}") for i in range(1, no_of_deployments + 1) ] # serve all the deployment flows serve(*deployments) if name == "__main__": run() # Call run_flow function asychronously with concurrency_limit params async def run_flow(deployment_name, flow_name, params={}, tags=[]): async with get_client() as client: deployment = await client.read_deployment_by_name(deployment_name) _ = await client.create_concurrency_limit( concurrency_limit=8, ) response = await client.create_flow_run_from_deployment( deployment_id=deployment.id, tags=tags, parameters=params, name=flow_name, )_ Also, when i try to do the same process using “local Prefect server instance”, the task work/runs perfectly without any issues.