Lachlan Casey
08/22/2024, 12:25 AMMarvin
08/22/2024, 12:25 AMprefect.yaml
file is used to store and manage configuration details for deploying your Prefect flows. Here’s a concise guide on how to use it:
1. Initialization:
- Run prefect init
in any directory or repository that contains your flow code. This command initializes your deployment configuration and creates a prefect.yaml
file.
2. Location:
- By default, prefect deploy
will use a prefect.yaml
file in the project's root directory. You can specify a custom file using the --prefect-file
argument, e.g., prefect deploy --prefect-file path/to/custom_file.yaml
.
3. Structure:
- The structure includes sections such as generic metadata, preparation steps (build, push), runtime steps (pull), and deployment configurations. Here's a basic structure:
yaml
prefect-version: null
name: null
build: null
push: null
pull: null
deployments:
- name: null
version: null
tags: []
description: null
schedule: null
entrypoint: null
parameters: {}
work_pool: {}
4. Example:
- A practical example could be:
yaml
name: my-project
prefect-version: 2.13.4
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
image_name: us-docker.pkg.dev/prefect-project/my-artifact-registry/my-image
tag: latest
push: null
pull: null
deployments:
- name: my-deployment
version: 1.0
tags: []
description: Example deployment
schedule: null
entrypoint: my_flow.py:my_flow
parameters: {}
work_pool: {}
For more details, you can refer to the official Prefect documentation or the GCP Worker Guide.Lachlan Casey
08/22/2024, 12:39 AMMarvin
08/22/2024, 12:40 AMprefect_test_harness
for unit testing. This can be used to run flows and tasks against a temporary local SQLite database.
Example with pytest:
python
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@flow
def my_favorite_flow():
return 42
def test_my_favorite_flow():
assert my_favorite_flow() == 42
2. Integration Testing: Make use of Prefect’s testing utilities to ensure your flows work as expected with other services and infrastructure components.
CI/CD:
1. Prefect CI/CD Guide: Follow the official guide for setting up CI/CD pipelines with Prefect.
2. Deployment Process: Automate the deployment process to build, push, and update Prefect deployments, ensuring version control and consistency.
3. Advanced CI/CD: Handle complex scenarios with considerations for different environments, independent deployments in a monorepo, and efficient build processes.
An example Prefect CI/CD setup might include:
yaml
name: cicd-example
prefect-version: 2.13.4
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
image_name: your-repo/cicd-example
tag: latest
push: null
pull: null
deployments:
- name: my-deployment
version: 1.0
tags: []
description: Example deployment
schedule: null
entrypoint: my_flow.py:my_flow
parameters: {}
work_pool: {}
For a detailed example, you can refer to the sourcecode example.
By following these best practices, you can streamline the development, testing, and deployment of your workflows with Prefect.Lachlan Casey
08/29/2024, 9:40 AMMarvin
08/29/2024, 9:41 AMdask.distributed
.
- RayTaskRunner: Employs the Ray framework for parallel execution.
Example using `ConcurrentTaskRunner`:
python
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
# Define tasks here
pass
my_flow()
3. Configuring Flow Deployment:
- Deploy multiple flows that can be scheduled and run independently.
- Use deployment configurations to control the execution order and concurrency via queues and priority.
Example to deploy flows:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def fast_flow():
return "I'm fast!"
@flow
def slow_flow():
import time
time.sleep(60)
return "I'm slow!"
fast_deploy = Deployment.build_from_flow(
flow=fast_flow,
name="fast-deployment"
)
slow_deploy = Deployment.build_from_flow(
flow=slow_flow,
name="slow-deployment"
)
if __name__ == '__main__':
fast_deploy.apply()
slow_deploy.apply()
4. Using Work Pools and Concurrency Limits:
- Set concurrency limits and priority within work pools to manage which flows or tasks get executed concurrently.
Guide and Example:
python
from prefect import flow, task
@task
def task_1():
return "Task 1 complete"
@task
def task_2():
return "Task 2 complete"
@flow
def concurrent_flow():
result_1 = task_1()
result_2 = task_2.submit()
return result_1, result_2
concurrent_flow()
Links for further reading:
- Prefect Work Pools
- Prefect Task Runners
- Prefect Deployment Tutorial
By using these configurations and tools, you can efficiently manage and execute concurrent and parallel flows within your Prefect deployments.Lachlan Casey
09/02/2024, 4:37 AMMarvin
09/02/2024, 4:37 AMConcurrentTaskRunner
, DaskTaskRunner
, and RayTaskRunner
.
- Example with `ConcurrentTaskRunner`:
python
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
# Define tasks that can be executed concurrently
pass
my_flow()
2. Multiple Flows Execution:
- Multiple flows can be run concurrently by invoking them independently from a parent flow or external orchestration.
- Using run_deployment
with the timeout=0
parameter can help run subflows asynchronously.
- You may start subflows as deployments asynchronously (via run_deployment
with timeout=0
) so the subflows can outlive their parent flow run.
Example:
python
from prefect import flow
from prefect.deployments import run_deployment
import time
@flow
def parent_flow():
for _ in range(3):
run_deployment(name="child-deployment", parameters={"param": "value"}, timeout=0)
parent_flow()
3. Handling and Cancelling Subflows:
- Currently, Prefect does not offer a direct feature to cancel subflow runs when the parent flow enters a terminal state. You might need to manage subflow life cycles manually.
- Issue reported: running multiple flows stored remotely can have issues, particularly on Windows systems. Work queues' concurrency limits should be managed to prevent such errors.
4. Future Enhancements (Under Discussion):
- Add Flow.submit
Interface: Prefect is considering adding a Flow.submit
interface to allow direct submission of subflows to run in the background and leverage result handling.
- Example Proposal:
python
from prefect import flow
@flow
def sub_flow():
return 1
@flow
def main_flow():
future = sub_flow.submit() # run in a worker thread
result = future.result()
print(result)
main_flow()
### Useful Links and Issues for Reference:
- GitHub Issue: Execute multiple flows concurrently
- GitHub Issue: Support subflow cancellation if parent flow is in terminal state
- GitHub Issue: Add `Flow.submit` interface
By leveraging these approaches, you can manage and run multiple flows concurrently within Prefect, addressing both task-level and flow-level concurrency requirements.