https://prefect.io logo
d

Deceivious

07/12/2023, 8:52 AM
Hi, Is there any way to run a deployment so that the flow run runs on a different queue than specified during the deployment time?
g

Giacomo Chiarella

07/12/2023, 9:07 AM
just create a deployment with another work queue name
d

Deceivious

07/12/2023, 9:18 AM
Life would be easy if everything was that simple. 😄
Not a bad idea though.
g

Giacomo Chiarella

07/12/2023, 9:20 AM
Copy code
await client.create_work_queue(name=work_queue_name, priority=priority,
                                           concurrency_limit=work_queue_concurrency, work_pool_name=work_pool_name)
d = Deployment.build_from_flow(flow, deployment_name, skip_upload=True)
d.work_queue_name = work_queue_name
d.apply()
d.upload_to_storage(ignore_file=None)
something like this, I do not have a working example to share
d

Deceivious

07/12/2023, 9:23 AM
I am not worried about the code. I am more worried about having to manage 2x OR possibly 3x the no of deployments I have right now. 😄
g

Giacomo Chiarella

07/12/2023, 9:25 AM
I’ve done that to create 5 deployments out of one flow due to different input parameters used by different team members
d

Deceivious

07/12/2023, 9:28 AM
Heres the usecase. We run flow to fetch daily data (date provided as input) hourly. The date defaults to today. Now we have to back fill 10 years of data for which I have made a new queue "backfill" so it wont effect the normal runs as backfill concurrency limit is set to 1. I am wondering that having a deployment for backfill for every flow we have is a bit overkill.
I guess the answer to my original question is no. I'll have to to find a way around as always. 😕
g

Giacomo Chiarella

07/12/2023, 9:32 AM
if you want a different queue, you need another deployment
or you can increase the size
but ten years are a lot, I would create a proper deployment for that and then remove it when it finishes
d

Deceivious

07/12/2023, 9:35 AM
😄
g

Giacomo Chiarella

07/12/2023, 9:42 AM
😉
d

Deceivious

07/12/2023, 11:46 AM
Ended up writing a script that copies existing deployment into a new deployment with modified work queues.
Copy code
async def copy_deployment(client):
    deployment = await client.read_deployment_by_name(
        f"{FLOW_NAME}/{DEPLOYMENT_NAME}"
    )
    infra_block= None
    if deployment.infrastructure_document_id:
        infra_doc=await client.read_block_document(deployment.infrastructure_document_id)
        infra_block = Block._from_block_document(infra_doc)

    storage_block=None
    if deployment.storage_document_id:
        storage_doc=await client.read_block_document(deployment.infrastructure_document_id)
        storage_block = Block._from_block_document(storage_doc)

    deployment = Deployment(
        name=f"backfill-{deployment.name}",
        description=f"BACKFILL FOR {deployment.description}",
        tags=["backfill", *deployment.tags],
        is_schedule_active=False,
        work_pool_name="default-agent-pool",
        work_queue_name="manual_back_fill",
        flow_name=FLOW_NAME,
        parameters=deployment.parameters,
        infrastructure=infra_block,
        infra_overrides=deployment.infra_overrides,
        storage=storage_block,
        path=deployment.path,
        entrypoint=deployment.entrypoint,
        parameter_openapi_schema=deployment.parameter_openapi_schema,
    )
    await deployment.apply()
    return f"{FLOW_NAME}/backfill-{deployment.name}"
Posting for any one searching for similar issue in the future.
🙌 1