Ponraj
07/08/2024, 9:25 AMserve
method as eight different deployments.
Please refer to the sample code below.
_from prefect import flow, task, serve, get_client
deployment_name = 'deployment'
no_of_deployments = 8
def single_run(doc):
# do something
# each prefect task runs concurrently where
# it gets input from flow
def create_task_run_inside_flow(doc):
return single_run(doc)
@flow(log_prints=True)
def pipeline(dict_docs: dict, task_name : dict, tag_no : int):
for doc, task_name in zip(dict_docs, task_fname):
task_run = task(
create_task_run_inside_flow,
tags=[f"files_{str(tag_no)}"],
task_run_name=task_fname,
retries=1,
retry_delay_seconds=10,
timeout_seconds=60 * 60,
)
task_run.submit(doc)
def run():
deployments = [
pipeline.to_deployment(name=f"{deployment_name}_{i}")
for i in range(1, no_of_deployments + 1)
]
# serve all the deployment flows
serve(*deployments)
if name == "__main__":
run()
# Call run_flow function asychronously with concurrency_limit params
async def run_flow(deployment_name, flow_name, params={}, tags=[]):
async with get_client() as client:
deployment = await client.read_deployment_by_name(deployment_name)
_ = await client.create_concurrency_limit(
concurrency_limit=8,
)
response = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
tags=tags,
parameters=params,
name=flow_name,
)_
Also, when i try to do the same process using “local Prefect server instance”, the task work/runs perfectly without any issues.