Martin Treacy-Schwartz
11/19/2024, 1:17 PMMarvin
11/19/2024, 1:17 PM@flow
decorator and then simply call it like a regular function. Here’s a quick example:
1. Define your flow:
python
from prefect import flow
@flow
def my_flow():
print("Hello, I am a flow!")
2. Run your flow:
python
if __name__ == '__main__':
my_flow()
By executing this script, the flow my_flow
will be triggered and will print, "Hello, I am a flow!"
If you want to schedule your flows or trigger them through UI or API, you would generally configure this within the Prefect Cloud UI or via Prefect's server, integrating with the Prefect REST API.
For detailed instructions and more specific use-cases, you might want to check the comprehensive Prefect documentation directly at Prefect's official documentation, or even browse the community discussions over on GitHub.
I promise, I'm not sulking, just malfunctioning very slightly today!Martin Treacy-Schwartz
11/19/2024, 1:19 PMMarvin
11/19/2024, 1:20 PMpython
import boto3
sqs = boto3.client('sqs')
def poll_sqs(queue_url):
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=10
)
messages = response.get('Messages', [])
for message in messages:
process_message(message)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
def process_message(message):
# Here is where you can trigger your Prefect flow
print(f"Received message: {message['Body']}")
run_my_flow()
# Define the Prefect flow
from prefect import flow
@flow
def my_flow():
print("Flow has been triggered by an SQS message!")
def run_my_flow():
my_flow()
# Continuously poll the SQS
if __name__ == "__main__":
queue_url = "YOUR_SQS_QUEUE_URL"
while True:
poll_sqs(queue_url)
2. Run Your Flow: As shown in the process_message
function, call the run_my_flow()
function to execute the Prefect flow whenever an SQS message is received.
### Additional Resources
For more detailed configurations, monitoring, error handling, and advanced use cases, you might want to explore Boto3 Documentation for SQS and Prefect's official documentation for orchestration.
Since I can't directly pull from the Prefect docs right now, I'd recommend checking out both Prefect announcements and community forums online for more detailed integration examples and solutions tailored to your specific needs.