Ponraj
07/08/2024, 9:24 AMserve
method as eight different deployments.
Also, when i try to do the same process using “local Prefect server instance”, the task work/runs perfectly without any issues.Marvin
07/08/2024, 12:55 PMHi,I am experiencing a problem where my Flow remains in a "Running" state, and my Task is stuck in a "Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments.
Please refer to the sample code below.
from prefect import flow, task, serve, get_client
deployment_name = 'deployment'
no_of_deployments = 8
def single_run(doc):
# do something
# each prefect task runs concurrently where
# it gets input from flow
def create_task_run_inside_flow(doc):
return single_run(doc)
@flow(log_prints=True)
def pipeline(dict_docs: dict, task_name : dict, tag_no : int):
for doc, task_name in zip(dict_docs, task_fname):
task_run = task(
create_task_run_inside_flow,
tags=[f"files_{str(tag_no)}"],
task_run_name=task_fname,
retries=1,
retry_delay_seconds=10,
timeout_seconds=60 * 60,
)
task_run.submit(doc)
def run():
deployments = [
pipeline.to_deployment(name=f"{deployment_name}_{i}")
for i in range(1, no_of_deployments + 1)
]
# serve all the deployment flows
serveMarvin
07/08/2024, 12:56 PMHi,Im experiencing a problem where my Flow in a "Running" state, and Task is stuck in “Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments.
sample code below:
from prefect import flow, task, serve, get_client
deployment_name = 'deployment'
no_of_deployments = 8
def single_run(doc):
# do something
# each prefect task runs concurrently where
# it gets input from flow
def create_task_run_inside_flow(doc):
return single_run(doc)
@flow(log_prints=True)
def pipeline(dict_docs: dict, task_name : dict, tag_no : int):
for doc, task_name in zip(dict_docs, task_fname):
task_run = task(
create_task_run_inside_flow,
tags=[f"files_{str(tag_no)}"],
task_run_name=task_fname,
retries=1,
retry_delay_seconds=10,
timeout_seconds=60 * 60,
)
task_run.submit(doc)
def run():
deployments = [
pipeline.to_deployment(name=f"{deployment_name}_{i}")
for i in range(1, no_of_deployments + 1)
]
# serve all the deployment flows
serve(*deployments)
if nameMarvin
07/08/2024, 12:57 PMHi,Im experiencing a problem where my Flow in a "Running" state, and Task is stuck in “Pending" state. Background: • Self-hosted Prefect server instance • Prefect version (client): 2.16.5 My objective is to initiate multiple tasks to run concurrently for a specific flow. This flow is deployed using the
serve
method as eight different deployments.
sample code below:
from prefect import flow, task, serve, get_client
deployment_name = 'deployment'
no_of_deployments = 8
def single_run(doc):
# do something
def create_task_run_inside_flow(doc):
return single_run(doc)
@flow(log_prints=True)
def pipeline(dict_docs: dict, task_name : dict, tag_no : int):
for doc, task_name in zip(dict_docs, task_fname):
task_run = task(
create_task_run_inside_flow,
tags=[f"files_{str(tag_no)}"],
task_run_name=task_fname,
retries=1,
retry_delay_seconds=10,
timeout_seconds=60 * 60,
)
task_run.submit(doc)
def run():
deployments = [
pipeline.to_deployment(name=f"{deployment_name}_{i}")
for i in range(1, no_of_deployments + 1)
]
# serve all the deployment flows
serve(*deployments)
if name == "__main__":
run()
# Call run_flow function asychronMarvin
07/08/2024, 12:58 PM