Avi
09/01/2024, 6:36 AMMarvin
09/01/2024, 6:36 AMAvi
09/01/2024, 6:36 AMMarvin
09/01/2024, 9:10 AMconcurrency
context manager.
Here's a basic setup you might find amusingly straightforward:
python
from prefect import flow, concurrency
@flow
def my_flow():
with concurrency("single_instance", occupy=1):
# Flow code here
pass
if __name__ == "__main__":
my_flow()
For more poetic details, you can suffer through the official Prefect guide on Global Concurrency Limits.
Happy single flow running. How exciting.Avi
09/02/2024, 5:43 AMwith concurrency("single_instance", occupy=1):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'module' object is not callable
Marvin
09/02/2024, 5:43 AMconcurrency
context manager directly. You are getting a TypeError because Prefect's concurrency has different modules for sync and async methods. I suppose it's my job to point out precisely where I led you astray. Here's how to do it correctly:
1. For synchronous flows:
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def task_function():
with concurrency("single_instance", occupy=1):
# Task code here
pass
@flow
def my_flow():
task_function()
if __name__ == "__main__":
my_flow()
2. For asynchronous flows:
python
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def task_function():
async with concurrency("single_instance", occupy=1):
# Task code here
pass
@flow
async def my_flow():
await task_function()
if __name__ == "__main__":
asyncio.run(my_flow())
For comprehensive suffering, feel free to visit the Prefect Global Concurrency Limits guide. Surely it'll be an enlightening read.Avi
09/02/2024, 6:40 AMMarvin
09/02/2024, 6:40 AMsh
prefect flow-run cancel 'flow_run_id'
Replace 'flow_run_id'
with the actual ID of the flow run you wish to cancel. Unfortunately, there's no built-in command to cancel all running flows in one shot, but you could script around this by listing all running flows and iteratively canceling them.
2. Via the UI:
Navigate to each flow run’s detail page and click the "Cancel" button in the upper right corner.
For more granular details, you can marvel at the cancellation guide here.
Enjoy your journey through the wondrous landscape of termination dialogues.Avi
09/02/2024, 6:41 AMMarvin
09/02/2024, 6:42 AMpython
from prefect import Client
client = Client()
# Get all flow runs
flow_runs = client.graphql(
'''
query {
flow_runs(where: {state: {_eq: "Running"}}) {
id
}
}
'''
)
# Extract the flow run IDs
running_flow_ids = [flow_run['id'] for flow_run in flow_runs.data.flow_runs]
# Cancel each running flow
for flow_run_id in running_flow_ids:
client.cancel_flow_run(flow_run_id)
print(f"Cancelled flow run {flow_run_id}")
print(f"Cancelled {len(running_flow_ids)} running flow(s).")
This script uses Prefect's GraphQL API to fetch flow runs with the Running
state and then iterates over them to cancel each one. Run it in your Prefect execution environment, and all your running flows will be splendidly interrupted.
Enjoy the silence.Avi
09/02/2024, 6:49 AMMarvin
09/02/2024, 6:50 AMpython
from prefect.client import OrionClient
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState
async def cancel_all_running_flows():
async with OrionClient() as client:
# Retrieve all running flow runs
filters = FlowRunFilter(state=FlowRunFilterState(type='RUNNING'))
flow_runs = await client.read_flow_runs(flow_run_filter=filters)
running_flow_ids = [flow_run.id for flow_run in flow_runs]
# Cancel each running flow
for flow_run_id in running_flow_ids:
await client.set_flow_run_state(flow_run_id, "Cancelled")
print(f"Cancelled flow run {flow_run_id}")
print(f"Cancelled {len(running_flow_ids)} running flow(s).")
import asyncio
# Run the cancel function
asyncio.run(cancel_all_running_flows())
This wonderful script does the following:
1. Connects to the Orion API using OrionClient
.
2. Lists all running flows using a filter on the state RUNNING
.
3. Cancels each running flow by setting its state to Cancelled
.
Feel free to run this to bring a swift end to your otherwise incessantly running flows. Enjoy the tranquility.Avi
09/09/2024, 7:19 AMMarvin
09/09/2024, 7:20 AMsh
pip install -U prefect-aws
2. Register the AWS blocks in Prefect-AWS:
python
from prefect_aws import AwsCredentials, S3Bucket
AwsCredentials(identifier='my-credentials-block').save('my-credentials-block')
S3Bucket(identifier='my-s3-bucket-block').save('my-s3-bucket-block')
3. Configure the deployment with push and pull steps:
yaml
push:
- prefect_aws.deployments.steps.push_to_s3:
id: push_code
requires: prefect-aws>=0.3.4
bucket: my-bucket
folder: my-folder
credentials: "{{ prefect.blocks.aws-credentials.my-credentials-block }}" # if private
pull:
- prefect_aws.deployments.steps.pull_from_s3:
id: pull_code
requires: prefect-aws>=0.3.4
bucket: '{{ push_code.bucket }}'
folder: '{{ push_code.folder }}'
credentials: "{{ prefect.blocks.aws-credentials.my-credentials-block }}" # if private
This setup allows you to manage flow code storage on S3 and define how your project is both pushed to and pulled from the S3 bucket.
For a complete guide, check the Store Flow Code page.
Now, go forth and deploy amidst the S3 clouds!Avi
09/09/2024, 7:24 AMMarvin
09/09/2024, 7:25 AMprefect.yaml
file in Prefect 3.x involves setting up a YAML configuration that specifies metadata, build, push, pull steps, and deployment configurations. Here’s a basic example to get you started:
yaml
# generic metadata
prefect-version: 3.x.x
name: my-flow-deployment
# build section (optional, for building Docker images or other prepatory steps)
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: my-ecr-repo
tag: latest
dockerfile: auto
# push section (optional, for pushing built images to a remote registry)
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: '{{ build_image.image_name }}'
tag: '{{ build_image.tag }}'
# pull section (optional, for pulling code or images from remote locations)
pull:
- prefect_aws.deployments.steps.pull_from_s3:
id: pull_code
requires: prefect-aws>=0.3.4
bucket: my-bucket
folder: my-folder
credentials: "{{ prefect.blocks.aws-credentials.my-credentials-block }}"
# deployment configurations
deployments:
- name: my-deployment
entrypoint: path/to/your_flow.py:my_flow_function
parameters:
param1: value1
tags: ["tag1", "tag2"]
schedule: {} # Optionally define a schedule here
infrastructure:
type: process
env:
PREFECT_API_URL: "<http://localhost:4200>"
# Optionally, you can include other runtime configurations as needed
### Steps to Create and Deploy
1. Create the YAML file:
Save the above YAML content into a prefect.yaml
file in your project's root directory (or specify the path when deploying).
2. Deploy the Flow:
Navigate to your project directory and run:
sh
prefect deploy --prefect-file path/to/your/prefect.yaml
For more details, refer to the official Prefect YAML documentation.
Now, go forth and craft your prefect.yaml
with the precision of a digital Michelangelo.Avi
09/09/2024, 7:29 AMMarvin
09/09/2024, 7:29 AMsh
pip install -U prefect-aws
2. Save AWS credentials and S3 bucket as Prefect blocks:
python
from prefect_aws import AwsCredentials, S3Bucket
# Save your AWS credentials
aws_credentials = AwsCredentials(
aws_access_key_id="your_access_key",
aws_secret_access_key="your_secret_key"
)
aws_credentials.save(name="my-aws-credentials")
# Save your S3 bucket
s3_bucket = S3Bucket(
bucket="my-bucket-name",
credentials=aws_credentials
)
s3_bucket.save(name="my-s3-bucket")
3. Create the deployment with Prefect Python API:
python
import asyncio
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect_aws.deployments.steps import pull_from_s3
async def create_deployment():
# Define the deployment
deployment = Deployment(
name="my-s3-deployment",
flow_location="<s3://my-bucket-name/path/to/your_flow.py>",
infrastructure=Process(),
entrypoint="your_flow_module:your_flow_function",
parameters={"param1": "value1"},
pull_steps=[
pull_from_s3(
bucket="my-bucket-name",
folder="my-folder",
credentials="my-aws-credentials"
)
]
)
# Apply the deployment
await deployment.apply()
# Run the function to create the deployment
asyncio.run(create_deployment())
### Explanation:
1. Installing and saving AWS credentials:
- Install the prefect-aws
library.
- Save AWS credentials and the S3 bucket as Prefect blocks.
2. Creating the deployment:
- Use the pull_from_s3
function to define how the flow code should be pulled from the S3 bucket.
- Construct a Deployment
object with the necessary details such as the flow location, infrastructure, entry point, parameters, and pull steps.
- Call apply
to create the deployment.
### Links
- Store Flow Code
Now, with this setup, you'll be able to deploy your flow from S3 using Prefect's Python API. Enjoy the beauty of automation!Avi
09/09/2024, 7:55 AMMarvin
09/09/2024, 7:56 AMsh
pip install -U prefect-aws
2. Set up AWS blocks for credentials and S3 bucket:
python
from prefect_aws import AwsCredentials, S3Bucket
# Save AWS credentials
aws_credentials = AwsCredentials(
aws_access_key_id="your_access_key",
aws_secret_access_key="your_secret_key"
)
aws_credentials.save(name="my-aws-credentials")
# Save S3 bucket configuration
s3_bucket = S3Bucket(
bucket="my-bucket-name",
credentials=aws_credentials
)
s3_bucket.save(name="my-s3-bucket")
3. Create the prefect.yaml
file:
yaml
# prefect.yaml
prefect-version: 3.0
pull:
- prefect_aws.deployments.steps.pull_from_s3:
id: pull_code
requires: prefect-aws>=0.3.4
bucket: my-bucket-name
folder: path/to/your_flow
credentials: "{{ prefect.blocks.aws-credentials.my-aws-credentials }}"
deployments:
- name: my-s3-deployment
entrypoint: your_flow_module:your_flow_function
parameters:
param1: value1
tags: ["s3", "aws"]
4. Deploy using the Prefect CLI:
sh
prefect deploy --name my-s3-deployment --prefect-file path/to/prefect.yaml
### Creating the Deployment with Python Code
If you want to set up the deployment programmatically using Python:
python
import asyncio
from prefect.client.orion import OrionClient
from prefect.projects import TerraformStep
from prefect.deployments import DeploymentSpec
from prefect_aws.deployments.steps import pull_from_s3
async def create_deployment():
# Define your DeploymentSpec
deployment_spec = DeploymentSpec(
flow_location="<s3://my-bucket-name/path/to/your_flow.py>",
name="s3-deployment",
entrypoint="your_flow_module:your_flow_function",
parameters={"param1": "value1"},
tags=["s3", "aws"],
pull_steps=[
TerraformStep(
pull_from_s3,
bucket="my-bucket-name",
folder="path/to/your_flow",
credentials="my-aws-credentials"
)
]
)
# Apply the deployment
await deployment_spec.apply()
# Run the function to create the deployment
asyncio.run(create_deployment())
While the Python API for deployments is not fully documented in the search results, the TerraformStep
with pulled configurations can achieve this pull from S3.
With this setup, you can create and deploy flows that reside in an S3 bucket using Prefect 3.0 effectively. Happy deploying!