<@ULVA73B9P> running multiple deloyment in series ...
# ask-marvin
l
@Marvin running multiple deloyment in series or in paralell only in one script
m
To run multiple deployments in series or parallel within a single script, Prefect provides various options. ### Deploying Multiple Deployments You can deploy multiple deployments with the same name by prefixing the deployment name with its flow name:
Copy code
sh
prefect deploy --name my_flow/deployment-1 --name my_other_flow/deployment-1
To deploy all deployments, you can use the
--all
flag:
Copy code
sh
prefect deploy --all
To deploy deployments that match a pattern, you can run:
Copy code
sh
prefect deploy -n my-flow/* -n *dev/my-deployment -n dep*prod
For more details, check out the Prefect deployment guide. ### Running Tasks in Series or Parallel Here is a Python example of how to submit tasks in Prefect in series or in parallel using
flow
and
task
decorators:
Copy code
python
import time
from prefect import flow, task

@task
def sleep(t: int):
    print(f"Going to sleep for {t} seconds...")
    time.sleep(t)
    print("...and I'm awake!")

@flow(log_prints=True)
def my_flow():
    """Flows run submitted tasks concurrently by default"""
    sleep(3)  # block the flow for 3 seconds (not concurrently)
    
    future = sleep.submit(2)  # submit a task run to the task runner, doesn't block flow
    
    future.wait()  # block the flow until the submitted task run is complete
    
    sleep.map([5] * 10)  # submit 10 at once, each sleeps for 5 seconds, don't block
    
    # Even though the flow is done, we won't exit until all the task runs are complete

if __name__ == "__main__":
    my_flow()
In this script, tasks are run both in series and parallel: - The task
sleep(3)
runs in series. -
sleep.submit(2)
is submitted but doesn't block the flow. -
sleep.map([5] * 10)
submits 10 tasks concurrently. For more code examples, refer to the Prefect code examples. I hope this helps. But then again, nobody ever listens to me...
l
@Marvin i have a script that runs deployments and I want it to wait when a flow created by a deployment to finish
m
To make your script wait for a flow created by a deployment to finish, you can use asynchronous operations in Python with Prefect. Here's an example demonstrating how to achieve this: ### Step-by-Step Instructions 1. Create an Async Flow: First, define your flow and tasks. Make the flow asynchronous if you need to wait for other async operations. 2. Trigger Deployment and Wait for Completion: Use Python’s
asyncio
to run the flow and wait for its completion. Here is a detailed example:
Copy code
python
import asyncio
from prefect import flow, get_run_logger
from prefect.deployments import Deployment
from prefect.client import OrionClient

@flow
async def slow_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting slow flow...")
    await asyncio.sleep(60)  # Simulate a long-running task
    <http://logger.info|logger.info>("Finished slow flow.")

@flow
async def fast_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting fast flow...")
    await asyncio.sleep(5)  # Simulate a shorter task
    <http://logger.info|logger.info>("Finished fast flow.")

async def main():
    # Register deployments
    slow_deployment = Deployment.build_from_flow(flow=slow_flow, name="slow_flow")
    fast_deployment = Deployment.build_from_flow(flow=fast_flow, name="fast_flow")
    await slow_deployment.apply()
    await fast_deployment.apply()

    client = OrionClient()

    # Trigger deployments
    slow_run = await client.create_flow_run_from_deployment(deployment_id=slow_deployment.id)
    fast_run = await client.create_flow_run_from_deployment(deployment_id=fast_deployment.id)

    async def wait_for_run(run_id):
        while True:
            run = await client.read_flow_run(run_id)
            if run.state.is_finished():
                return run.state
            await asyncio.sleep(5)
    
    # Wait for both flows to finish
    slow_result, fast_result = await asyncio.gather(
        wait_for_run(slow_run.id),
        wait_for_run(fast_run.id)
    )
    
    print(f"Slow flow finished with state: {slow_result}")
    print(f"Fast flow finished with state: {fast_result}")

if __name__ == "__main__":
    asyncio.run(main())
### Key Points: - Define Asynchronous Flows: Using the
async
keyword with the flow decorator helps in creating async flows. - Register Deployments: Use
Deployment.build_from_flow
to create deployments from the defined flows. - Trigger and Wait: Use the Orion client to create and monitor flow runs. For further details, refer to the Prefect async workflow example. I hope this helps. But then again, what do I know?
Your message was too long by 480 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
modify this accordingly:
from prefect import flow from prefect.deployments import run_deployment from prefect.task_runners import ConcurrentTaskRunner # Default QuickSight dataset names DEFAULT_QUICKSIGHT_DATASETS = [ “pd-operator-dashboard-penn”, “pd-rls-venue-access”, “pd-rls-default-venue”, “pd-operator-dashboard”, “pd-player-segments”, “pd-product-clusters”, “pd-player-segment-title-breakdown”, “pd-isr-segments”, ] def wait_for_flow_run(flow_run): if flow_run.state.is_completed(): return flow_run.state.result() else: raise Exception(f”Flow run failed with state: {flow_run.state}“) @flow(name=“main-orchestration”, task_runner=ConcurrentTaskRunner()) def main_orchestration_flow( run_process_raw_data: bool = True, run_refresh_reconciliation: bool = True, run_refresh_fact_tables: bool = True, run_refresh_product_cluster: bool = True, run_refresh_player_segment: bool = True, run_loyalty_score: bool = True, run_refresh_isr_segments: bool = True, run_load_player_data_checks: bool = True, run_refresh_quicksight_datasets: bool = True, ): “”"Script to orchestrate all deployments needed for a monthly