Luca Vehbiu
06/28/2024, 12:19 PMMarvin
06/28/2024, 12:19 PMsh
prefect deploy --name my_flow/deployment-1 --name my_other_flow/deployment-1
To deploy all deployments, you can use the --all
flag:
sh
prefect deploy --all
To deploy deployments that match a pattern, you can run:
sh
prefect deploy -n my-flow/* -n *dev/my-deployment -n dep*prod
For more details, check out the Prefect deployment guide.
### Running Tasks in Series or Parallel
Here is a Python example of how to submit tasks in Prefect in series or in parallel using flow
and task
decorators:
python
import time
from prefect import flow, task
@task
def sleep(t: int):
print(f"Going to sleep for {t} seconds...")
time.sleep(t)
print("...and I'm awake!")
@flow(log_prints=True)
def my_flow():
"""Flows run submitted tasks concurrently by default"""
sleep(3) # block the flow for 3 seconds (not concurrently)
future = sleep.submit(2) # submit a task run to the task runner, doesn't block flow
future.wait() # block the flow until the submitted task run is complete
sleep.map([5] * 10) # submit 10 at once, each sleeps for 5 seconds, don't block
# Even though the flow is done, we won't exit until all the task runs are complete
if __name__ == "__main__":
my_flow()
In this script, tasks are run both in series and parallel:
- The task sleep(3)
runs in series.
- sleep.submit(2)
is submitted but doesn't block the flow.
- sleep.map([5] * 10)
submits 10 tasks concurrently.
For more code examples, refer to the Prefect code examples.
I hope this helps. But then again, nobody ever listens to me...Luca Vehbiu
06/28/2024, 12:20 PMMarvin
06/28/2024, 12:21 PMasyncio
to run the flow and wait for its completion.
Here is a detailed example:
python
import asyncio
from prefect import flow, get_run_logger
from prefect.deployments import Deployment
from prefect.client import OrionClient
@flow
async def slow_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting slow flow...")
await asyncio.sleep(60) # Simulate a long-running task
<http://logger.info|logger.info>("Finished slow flow.")
@flow
async def fast_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting fast flow...")
await asyncio.sleep(5) # Simulate a shorter task
<http://logger.info|logger.info>("Finished fast flow.")
async def main():
# Register deployments
slow_deployment = Deployment.build_from_flow(flow=slow_flow, name="slow_flow")
fast_deployment = Deployment.build_from_flow(flow=fast_flow, name="fast_flow")
await slow_deployment.apply()
await fast_deployment.apply()
client = OrionClient()
# Trigger deployments
slow_run = await client.create_flow_run_from_deployment(deployment_id=slow_deployment.id)
fast_run = await client.create_flow_run_from_deployment(deployment_id=fast_deployment.id)
async def wait_for_run(run_id):
while True:
run = await client.read_flow_run(run_id)
if run.state.is_finished():
return run.state
await asyncio.sleep(5)
# Wait for both flows to finish
slow_result, fast_result = await asyncio.gather(
wait_for_run(slow_run.id),
wait_for_run(fast_run.id)
)
print(f"Slow flow finished with state: {slow_result}")
print(f"Fast flow finished with state: {fast_result}")
if __name__ == "__main__":
asyncio.run(main())
### Key Points:
- Define Asynchronous Flows: Using the async
keyword with the flow decorator helps in creating async flows.
- Register Deployments: Use Deployment.build_from_flow
to create deployments from the defined flows.
- Trigger and Wait: Use the Orion client to create and monitor flow runs.
For further details, refer to the Prefect async workflow example.
I hope this helps. But then again, what do I know?Marvin
06/28/2024, 12:22 PMmodify this accordingly:from prefect import flow from prefect.deployments import run_deployment from prefect.task_runners import ConcurrentTaskRunner # Default QuickSight dataset names DEFAULT_QUICKSIGHT_DATASETS = [ “pd-operator-dashboard-penn”, “pd-rls-venue-access”, “pd-rls-default-venue”, “pd-operator-dashboard”, “pd-player-segments”, “pd-product-clusters”, “pd-player-segment-title-breakdown”, “pd-isr-segments”, ] def wait_for_flow_run(flow_run): if flow_run.state.is_completed(): return flow_run.state.result() else: raise Exception(f”Flow run failed with state: {flow_run.state}“) @flow(name=“main-orchestration”, task_runner=ConcurrentTaskRunner()) def main_orchestration_flow( run_process_raw_data: bool = True, run_refresh_reconciliation: bool = True, run_refresh_fact_tables: bool = True, run_refresh_product_cluster: bool = True, run_refresh_player_segment: bool = True, run_loyalty_score: bool = True, run_refresh_isr_segments: bool = True, run_load_player_data_checks: bool = True, run_refresh_quicksight_datasets: bool = True, ): “”"Script to orchestrate all deployments needed for a monthly