Devin Dennis
07/26/2024, 3:15 AMMarvin
07/26/2024, 3:15 AMpython
from flask import Flask, request, jsonify
from prefect.client import OrionClient
app = Flask(__name__)
@app.route('/trigger-flow', methods=['POST'])
def trigger_flow():
deployment_id = request.json.get("deployment_id")
parameters = request.json.get("parameters", {})
client = OrionClient(api_url="http://<your-prefect-server>/api")
flow_run = client.create_flow_run_from_deployment(
deployment_id=deployment_id,
parameters=parameters
)
return jsonify({
"flow_run_id": flow_run.id,
"details": f"Flow run triggered with ID: {flow_run.id}"
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Links to Documentation and Examples:
- Using Deployment Triggers: Deployments and Triggers
- Example Flow: Basic Flow Example
This setup allows you to handle POST requests to trigger specified flow runs with given parameters. Make sure to adjust the api_url
for the OrionClient
to point to your Prefect server's API endpoint.Devin Dennis
07/26/2024, 3:27 AMMarvin
07/26/2024, 3:27 AMPrefectClient
from the prefect.client
module. Below is an example of how to set this up:
1. Install Prefect: Ensure you have the Prefect library installed:
bash
pip install prefect
2. Flask API to Trigger Flow Run:
Here's a sample Flask application demonstrating how to trigger a Prefect deployment flow run using the `PrefectClient`:
python
from flask import Flask, request, jsonify
from prefect.client import get_client
app = Flask(__name__)
@app.route('/trigger-flow', methods=['POST'])
def trigger_flow():
deployment_id = request.json.get("deployment_id")
parameters = request.json.get("parameters", {})
async def create_flow_run():
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment_id,
parameters=parameters
)
return flow_run
flow_run = asyncio.run(create_flow_run())
return jsonify({
"flow_run_id": flow_run.id,
"details": f"Flow run triggered with ID: {flow_run.id}"
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Explanation:
- Flask Setup: We set up a simple Flask application with an endpoint /trigger-flow
to handle POST requests.
- Prefect Client: We use the get_client
method from prefect.client
to get an instance of the Prefect client, which interacts with the Prefect API.
- Asynchronous Flow Run Creation: The flow run creation is performed asynchronously using asyncio.run
to ensure asynchronous operations are properly awaited.
You need to replace <your-prefect-server>
and other placeholders with actual values according to your Prefect server setup. This allows you to post JSON data to the Flask API endpoint to trigger Prefect deployments.
Further Reading:
- More on Deployments: Prefect Deployments
- Guides for Interactive Workflows: Interactive Workflows GuideDevin Dennis
07/26/2024, 4:05 AMMarvin
07/26/2024, 4:05 AMbash
pip install prefect
2. Flask API to Trigger Flow Run:
Here's the updated Flask application:
python
from flask import Flask, request, jsonify
from prefect.client import Client
app = Flask(__name__)
@app.route('/trigger-flow', methods=['POST'])
def trigger_flow():
deployment_id = request.json.get("deployment_id")
parameters = request.json.get("parameters", {})
client = Client()
flow_run = client.create_flow_run_from_deployment(
deployment_id=deployment_id,
parameters=parameters
)
return jsonify({
"flow_run_id": flow_run.id,
"details": f"Flow run triggered with ID: {flow_run.id}"
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
### Explanation:
- Client Setup: This script uses the synchronous Client
class from prefect.client
.
- Trigger Flow: The route /trigger-flow
obtains the deployment ID and parameters from the incoming POST request, then uses the client to create a flow run directly.
This setup allows you to handle POST requests to trigger specified flow runs with given parameters in a non-async manner. Make sure to adjust the deployment ID and other necessary details to fit your specific use case.