Hey team, coming from other orchestration tools. Q...
# ask-community
k
Hey team, coming from other orchestration tools. Question about flow run concurrency. Anyone have a nice way of ensuring that for FlowA, scheduled every 6hours, that there can only be 1 flowrun executing at any time? If flowrun_0 errored and was retried, I dont want flowrun_1 to execute until flowrun_0 has finished
1
w
Hey KG - there are a few different ways - you can put a concurrency limit on a work pool where flow a and b run to ensure only one is in a running state concurrently, or you can use a code-first solution context manager as well. The first is the simplest IMO but let me know if you need any more detail or want to discuss.
k
Hey Will - thanks for the quick response. In this case, I want FlowA, FlowB, and FlowC to all be running on the work pool. so setting a concurrency limit of 1 on the pool sounds like it will keep FlowB from running if FlowA is running. Instead, I want flow specific concurrency. FlowARun-0 and FlowARun-1 shouldn't be able to both be running at the same time. Its like Databricks Workflow concurrency limits Does that make sense?
w
Great thanks for the that detail, I think you’d be able to use the global concurrency limit context manager to achieve this, but I’m not highly familiar with this feature. let me rope in a colleague to see if they have any notes as well.
🙌 1
k
I appreciate it! I've read through the docs and its not immediately clear how to apply concurrency to flows, rather than tasks
a
We have the same use case and we also identified the same two approaches. Use case: We have a flow (hourly_flow) that runs every hour but sometimes takes longer than an hour. In those cases we want the second instance to wait until the first is completed before starting. We have another flow (daily_flow) that runs once a day. We want no instance of hourly_flow to run while daily_flow is running. Not really sure how we should ensure such a concurrency. Leaning towards work queue level concurrency but willing to hear more
w
So if you create a concurrency limit in your workspace called AB for example with a limit of one, I’d then make sure that each flow run call uses the concurrency context manager when invoking the flow. It will check if the available slot is open, and only execute if so. I can invoke the following two flows at the same time, and they will only every execute one at a time.
Copy code
from prefect.concurrency.sync import concurrency
from prefect import flow
from time import sleep

@flow
def flow_a():
    print("flow_a sleeping for 100")
    sleep(100)
    pass

if __name__ == "__main__":
    with concurrency("AB"):
        flow_a()
Copy code
from prefect.concurrency.sync import concurrency
from prefect import flow
from time import sleep

@flow
def flow_b():
    print("flow_a sleeping for 100")
    sleep(100)
    pass

if __name__ == "__main__":
    with concurrency("AB"):
        flow_b()
👀 1
🙌 1
a
Thanks Will! I will take that to my team and see what they say. Would that be created under concurrency limits UI? Any terraformy way to manage that?
w
yep you can create them in the CLI as well, check out
Copy code
prefect concurrency-limit --help
thank you 1
k
Huge. Thanks Will R
Hey Will, coming back to this to actually implement it... Any recommendations on adding the flow concurrency while using the Deployment.build_from_flow method? We're building deployments for all of our flows as a list, then looping with an await -> apply method.
Copy code
ALL_DEPLOYMENTS = [
Deployment.build_from_flow(...),
Deployment.build_from_flow(...)]

for deployment in ALL_DEPLOYMENTS:
   await deployment.apply(upload=False)
I can't figure out how to add the global concurrency context with this deployment format 🤔
Seems like the apply method would need to implement the context thinker When I register the flow with the API I want to add the with ...
Copy code
@sync_compatible
async def apply(
    self, upload: bool = False, work_queue_concurrency: int = None
) -> UUID:
    """
    Registers this deployment with the API and returns the deployment's ID.

    Args:
        upload: if True, deployment files are automatically uploaded to remote
            storage
        work_queue_concurrency: If provided, sets the concurrency limit on the
            deployment's work queue
    """
    if not self.name or not self.flow_name:
        raise ValueError("Both a deployment name and flow name must be set.")
    async with get_client() as client:
        # prep IDs
        flow_id = await client.create_flow_from_name(self.flow_name)

        infrastructure_document_id = self.infrastructure._block_document_id
        if not infrastructure_document_id:
            # if not building off a block, will create an anonymous block
            self.infrastructure = self.infrastructure.copy()
            infrastructure_document_id = await self.infrastructure._save(
                is_anonymous=True,
            )

        if upload:
            await self.upload_to_storage()

        if self.work_queue_name and work_queue_concurrency is not None:
            try:
                res = await client.create_work_queue(
                    name=self.work_queue_name, work_pool_name=self.work_pool_name
                )
            except ObjectAlreadyExists:
                res = await client.read_work_queue_by_name(
                    name=self.work_queue_name, work_pool_name=self.work_pool_name
                )
            await client.update_work_queue(
                res.id, concurrency_limit=work_queue_concurrency
            )

        # we assume storage was already saved
        storage_document_id = getattr(self.storage, "_block_document_id", None)
        deployment_id = await client.create_deployment(
            flow_id=flow_id,
            name=self.name,
            work_queue_name=self.work_queue_name,
            work_pool_name=self.work_pool_name,
            version=self.version,
            schedule=self.schedule,
            is_schedule_active=self.is_schedule_active,
            parameters=self.parameters,
            description=self.description,
            tags=self.tags,
            manifest_path=self.manifest_path,  # allows for backwards YAML compat
            path=self.path,
            entrypoint=self.entrypoint,
            infra_overrides=self.infra_overrides,
            storage_document_id=storage_document_id,
            infrastructure_document_id=infrastructure_document_id,
            parameter_openapi_schema=self.parameter_openapi_schema.dict(),
            enforce_parameter_schema=self.enforce_parameter_schema,
        )

        if client.server_type == ServerType.CLOUD:
            # The triggers defined in the deployment spec are, essentially,
            # anonymous and attempting truly sync them with cloud is not
            # feasible. Instead, we remove all automations that are owned
            # by the deployment, meaning that they were created via this
            # mechanism below, and then recreate them.
            await client.delete_resource_owned_automations(
                f"prefect.deployment.{deployment_id}"
            )
            for trigger in self.triggers:
                trigger.set_deployment_id(deployment_id)
                await client.create_automation(trigger.as_automation())

        return deployment_id