Shamly Mackey
01/22/2025, 8:08 PMMarvin
01/22/2025, 8:09 PMpython
from prefect import task
@task(log_prints=True)
def my_fastapi_task(name: str):
print(f"Hello, {name}!")
2. Set Up FastAPI: Create a FastAPI app with POST and GET endpoints.
python
from fastapi import FastAPI
from prefect import flow
app = FastAPI()
@app.post("/run-task/")
async def run_task(name: str):
my_fastapi_task.submit(name)
return {"message": f"Task for {name} submitted"}
@app.get("/status/")
async def status():
return {"status": "Running"}
3. Dockerize Your Application: Create a Dockerfile to containerize your FastAPI and Prefect setup.
dockerfile
FROM python:3.9
WORKDIR /app
COPY . /app
RUN pip install fastapi uvicorn prefect
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
4. Build and Run the Docker Container: Use Docker commands to build the image and run the container.
bash
docker build -t my-fastapi-prefect-app .
docker run -p 8000:8000 my-fastapi-prefect-app
5. Access Your API: You can now access your API at <http://localhost:8000>
. Use the POST endpoint to trigger Prefect tasks.
- POST to /run-task/
with a JSON body like {"name": "World"}
to submit a task.
- GET from /status/
to check the status of your FastAPI service.
### Additional Resources
- Prefect in Docker: Configure and manage Prefect with Docker.
- Prefect and FastAPI Integration Example: A GitHub repository with examples of integrating Prefect tasks with FastAPI.
This setup allows you to deploy Prefect flows locally within a Docker container that serves FastAPI endpoints, aligning with your preference for an imperative programming style.Shamly Mackey
01/22/2025, 8:22 PMMarvin
01/22/2025, 8:23 PMpython
from fastapi import FastAPI
from prefect import flow, task
app = FastAPI()
# Define a task
@task(log_prints=True)
def my_fastapi_task(name: str):
print(f"Hello, {name}!")
# Define a flow that includes the task
@flow
def my_flow(name: str):
my_fastapi_task(name)
# FastAPI endpoint to trigger the flow
@app.post("/run-task/")
async def run_task(name: str):
my_flow(name)
return {"message": f"Flow for {name} executed"}
@app.get("/status/")
async def status():
return {"status": "Running"}
### Deploying Locally
1. Run Locally: You can run this setup locally without Docker to test it out.
bash
uvicorn main:app --reload
2. Dockerize and Deploy: If you want to deploy this within a Docker container, use the Dockerfile setup from the previous message. Build and run your Docker container as before.
3. Access the API: With your API running, you can POST to /run-task/
with a JSON body like {"name": "World"}
to execute the flow.
### Deployment with Prefect
For more advanced deployment options, consider using Prefect's orchestration capabilities:
- Prefect Cloud or Server: Deploy your flows to Prefect Cloud or a self-hosted Prefect Server for better management and monitoring.
- Work Pools and Agents: Use Prefect's work pools and agents to manage execution environments.
This approach allows you to maintain a clean separation between your API logic and task orchestration while still keeping everything in one place for simplicity. You can explore more about Prefect's deployment options in their documentation.Shamly Mackey
01/22/2025, 8:24 PMShamly Mackey
01/22/2025, 8:25 PMMarvin
01/22/2025, 8:25 PMprefect.yaml
File: This file defines your deployment configuration. Place it in the root directory of your project.
yaml
version: 1
deployments:
- name: my-fastapi-flow
flow: main.py:my_flow # Path to your flow function
schedule: null # Add a schedule if needed
tags: []
infra_overrides: {} # Override infrastructure settings if necessary
2. Deploy the Flow: Use the Prefect CLI to deploy your flow.
bash
prefect deploy
This command will read the prefect.yaml
file, build the deployment, and register it with Prefect.
3. Set Up a Work Pool: Prefect uses work pools to manage execution environments. You can set up a work pool that suits your infrastructure needs.
- Static Infrastructure: For environments where infrastructure remains constant.
- Dynamic Infrastructure: For environments that scale horizontally.
More details on work pools can be found here.
4. Start a Worker: If your deployment type requires it, start a Prefect worker to execute the flows.
bash
prefect worker start --pool my-work-pool
Workers are responsible for checking scheduled flow runs and executing them.
5. Monitor and Manage: Use the Prefect UI or CLI to monitor and manage your flow runs. Prefect Cloud or a self-hosted Prefect Server can provide more advanced monitoring capabilities.
### Additional Resources
- Prefect Deployment Guide: Comprehensive guide on deploying flows with Prefect.
- Prefect YAML Configuration: Details on configuring your deployment using prefect.yaml
.
This setup allows you to deploy and manage your Prefect flows effectively, whether they're running locally or on remote infrastructure.Nate
01/23/2025, 12:23 AMPOST /chat
endpoint that asyncio.create_task(handle_app_mention)
where handle_app_mention
is a prefect flow
i use asyncio
to background since slack needs a response in less than 3 seconds but it might take the LLM longer to call tools / make a response
https://github.com/PrefectHQ/marvin/tree/main/cookbook/slackbotShamly Mackey
01/23/2025, 6:36 PMNate
01/23/2025, 6:41 PM@flow
or @task
is doing LLM stuff or not, you can use the same pattern
the big decision I'm suggesting you need to make is whether or not you want your flows/tasks to run on independent infra (e.g. a separate docker container from your fastapi app)
if not, you can just do something like this
@flow
def my_prefect_flow(...): ...
@app.post('/whatever')
def handler(...):
my_prefect_flow(...)
# or if you want avoid blocking the endpoint until the flow ends
asyncio.create_task(my_prefect_flow(...))
if you do, then you can replace either of those lines with run_deployment()
to instead trigger some remote deployment from your endpoint (instead of running it in the same process)Shamly Mackey
01/23/2025, 6:45 PMNate
01/23/2025, 6:50 PMI was thinking of not using a separate infra, but is there a standard practice in the industry that you suggest or recommend?i would say there's no one size fits all way to think about this, but if your flows have serious resource requirements (ie lots of memory or cpu) then it may make sense to define independent deployments for them so you can tweak
job_variables
to allocate extra resources to that work independent of your web appNate
01/23/2025, 6:50 PMMarvin
01/23/2025, 6:50 PMpython
from fastapi import FastAPI, BackgroundTasks
from prefect import flow, task
app = FastAPI()
@task(log_prints=True)
def my_fastapi_task(name: str):
print(f"Hello, {name}!")
@flow
def my_flow(name: str):
my_fastapi_task(name)
@app.post("/run-task/")
async def run_task(name: str, background_tasks: BackgroundTasks):
background_tasks.add_task(my_flow, name)
return {"message": f"Flow for {name} is running in the background"}
@app.get("/status/")
async def status():
return {"status": "Running"}
2. Run Locally: Simply run your FastAPI application locally.
bash
uvicorn main:app --reload
3. Access the API: Use the POST endpoint to trigger tasks and flows in the background.
- POST to /run-task/
with a JSON body like {"name": "World"}
.
### Explanation
- BackgroundTasks: FastAPI's BackgroundTasks
is used to run tasks in the background without blocking the main thread. This is ideal for lightweight tasks that don't require heavy computation or long-running processes.
- Single Process: Everything runs within a single FastAPI process, reducing complexity and overhead.
This approach is perfect for scenarios where the task is not resource-intensive and can run alongside your API server without affecting performance.Nate
01/23/2025, 6:51 PMBackgroundTasks
instead of asyncio
if you want, but its mostly the same thing šShamly Mackey
01/23/2025, 6:55 PMdocker-pool
as docker type, set up on my machine. Would I need to run a script similar to the code block below for the worker to detect my script?
from prefect import flow
from prefect.docker import DockerImage
@flow(log_prints=True)
def buy():
print("Buying securities")
if __name__ == "__main__":
buy.deploy(
name="my-custom-dockerfile-deployment",
work_pool_name="my-docker-pool",
image=DockerImage(
name="my_image",
tag="deploy-guide",
dockerfile="Dockerfile"
),
push=False
)
Shamly Mackey
01/23/2025, 6:56 PMShamly Mackey
01/23/2025, 6:58 PMNate
01/23/2025, 6:58 PM.deploy()
or prefect deploy
) which assigns a work pool (infra config) to your code
⢠start a worker: worker submits scheduled runs according to the work poolShamly Mackey
01/23/2025, 7:00 PM