Devin Dennis
07/26/2024, 3:15 AMMarvin
07/26/2024, 3:15 AMpython
from flask import Flask, request, jsonify
from prefect.client import OrionClient
app = Flask(__name__)
@app.route('/trigger-flow', methods=['POST'])
def trigger_flow():
deployment_id = request.json.get("deployment_id")
parameters = request.json.get("parameters", {})
client = OrionClient(api_url="http://<your-prefect-server>/api")
flow_run = client.create_flow_run_from_deployment(
deployment_id=deployment_id,
parameters=parameters
)
return jsonify({
"flow_run_id": flow_run.id,
"details": f"Flow run triggered with ID: {flow_run.id}"
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Links to Documentation and Examples:
- Using Deployment Triggers: Deployments and Triggers
- Example Flow: Basic Flow Example
This setup allows you to handle POST requests to trigger specified flow runs with given parameters. Make sure to adjust the api_url
for the OrionClient
to point to your Prefect server's API endpoint.Devin Dennis
07/26/2024, 3:27 AMMarvin
07/26/2024, 3:27 AMPrefectClient
from the prefect.client
module. Below is an example of how to set this up:
1. Install Prefect: Ensure you have the Prefect library installed:
bash
pip install prefect
2. Flask API to Trigger Flow Run:
Here's a sample Flask application demonstrating how to trigger a Prefect deployment flow run using the `PrefectClient`:
python
from flask import Flask, request, jsonify
from prefect.client import get_client
app = Flask(__name__)
@app.route('/trigger-flow', methods=['POST'])
def trigger_flow():
deployment_id = request.json.get("deployment_id")
parameters = request.json.get("parameters", {})
async def create_flow_run():
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment_id,
parameters=parameters
)
return flow_run
flow_run = asyncio.run(create_flow_run())
return jsonify({
"flow_run_id": flow_run.id,
"details": f"Flow run triggered with ID: {flow_run.id}"
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Explanation:
- Flask Setup: We set up a simple Flask application with an endpoint /trigger-flow
to handle POST requests.
- Prefect Client: We use the get_client
method from prefect.client
to get an instance of the Prefect client, which interacts with the Prefect API.
- Asynchronous Flow Run Creation: The flow run creation is performed asynchronously using asyncio.run
to ensure asynchronous operations are properly awaited.
You need to replace <your-prefect-server>
and other placeholders with actual values according to your Prefect server setup. This allows you to post JSON data to the Flask API endpoint to trigger Prefect deployments.
Further Reading:
- More on Deployments: Prefect Deployments
- Guides for Interactive Workflows: Interactive Workflows GuideDevin Dennis
07/26/2024, 4:05 AMMarvin
07/26/2024, 4:05 AMbash
pip install prefect
2. Flask API to Trigger Flow Run:
Here's the updated Flask application:
python
from flask import Flask, request, jsonify
from prefect.client import Client
app = Flask(__name__)
@app.route('/trigger-flow', methods=['POST'])
def trigger_flow():
deployment_id = request.json.get("deployment_id")
parameters = request.json.get("parameters", {})
client = Client()
flow_run = client.create_flow_run_from_deployment(
deployment_id=deployment_id,
parameters=parameters
)
return jsonify({
"flow_run_id": flow_run.id,
"details": f"Flow run triggered with ID: {flow_run.id}"
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
### Explanation:
- Client Setup: This script uses the synchronous Client
class from prefect.client
.
- Trigger Flow: The route /trigger-flow
obtains the deployment ID and parameters from the incoming POST request, then uses the client to create a flow run directly.
This setup allows you to handle POST requests to trigger specified flow runs with given parameters in a non-async manner. Make sure to adjust the deployment ID and other necessary details to fit your specific use case.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by