Chris Purta
10/01/2024, 4:09 PMSCHEDULED
state and the deployment shows a status of Not Ready
. Any thoughts on what may be going on here or what is the best way to accomplish starting a prefect flow in the background and then polling for the results.
from prefect import flow, task
from prefect.client.orchestration import get_client
import time
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@task
async def last_task():
print("finished last task")
@task
async def mock_work_task():
print("mocking doing some work")
time.sleep(30)
print("done mocking some work")
@task
async def first_task():
print("finished first task")
@flow
async def test_flow():
await first_task()
await mock_work_task()
await last_task()
@app.post("/start-flow")
async def create_slide_handler():
async with get_client() as client:
flow_id = await client.create_flow(test_flow)
deployment_id = await client.create_deployment(flow_id, "test-flow")
flow_run = await client.create_flow_run_from_deployment(deployment_id)
print(f"FlowRun: {flow_run}")
return {"response": flow_run}
@app.get("/get-flow-results")
async def get_slide_results_handler(workflow_run_id: str):
async with get_client() as client:
flow_run = await client.read_flow_run(workflow_run_id)
return {"flow_run": flow_run}
if __name__ == "__main__":
print("Running server with HTTP")
uvicorn.run(app,
host="0.0.0.0",
port=8080)
Here is what shows in the local Prefect UI after making a request to /start_flow
Nate
10/01/2024, 4:54 PMChris Purta
10/01/2024, 7:29 PMNate
10/01/2024, 8:09 PMRegarding the worker not running to pick up a deployment what is the best way to run onedepends on your choice of runtime for your app! for example, if its all on a VM, you just
prefect worker start
in a systemd process or something, but if you're on k8s, you could run the helm chart in your clusterChris Purta
10/02/2024, 2:16 PMmark doerr
10/02/2024, 5:26 PMChris Purta
10/02/2024, 6:27 PMworkflow_run_id
or the `task_run_id`(s) that were started. We could then poll the same FastAPI instance to check the status of the flow or tasks and then grab the results and return them if the flow or tasks are completed. Assuming that flows or tasks take minutes to complete we currently cannot leave a connection open for longer than a few seconds as that will cause scalability concerns.mark doerr
10/02/2024, 6:32 PMmark doerr
10/02/2024, 6:32 PMNate
10/02/2024, 6:36 PM{"status": "ok"}
within 3 seconds, but I want marvin the agentic system to be able to think and use tools for longer than that, so i background it. now if I need to do something downstream with the result of marvin's work, I would need to somehow signal to the downstream stakeholder that this result is available to be consumed. polling an endpoint of your app is one option, pubsub is another, but that's implementation detailMarvin
10/02/2024, 6:36 PM@task
decorator, allowing you to run functions asynchronously and manage their states.
- Reference: Prefect 3.x Documentation - Write Tasks
### Notification on Task Completion
Prefect Cloud offers automation notifications where you can set up alerts for flow or task run state changes (e.g., when a task completes, fails, or retries). This way, you can notify downstream stakeholders or systems when a task is done.
Prefect 2.x Example:
python
from prefect import flow, task
from prefect.tasks.notifications import send_slack_message
@task
def long_running_task():
# Simulate long-running task
import time
time.sleep(10)
return "Task Completed"
@flow
def example_flow():
result = long_running_task()
send_slack_message('Task has completed: {}'.format(result))
if __name__ == '__main__':
example_flow()
- Here, send_slack_message
could be replaced with any other notification mechanism, like PubSub, email alerts, etc.
### References
- Successfully Deploying a Task Queue: https://www.prefect.io/blog/successfully-deploying-task-queue
- Prefect 2.x Documentation - Notifications: https://docs-2.prefect.io/latest/guides/host/Marvin
10/02/2024, 6:36 PMNate
10/02/2024, 6:37 PMNate
10/02/2024, 6:41 PMexpects
the prefect.task-run.Completed
event associated with your backgrounded task, then you don't have to poll -- downstream work is just kicked off automatically once upstream work is done, so you dont have to ask if the result is availableChris Purta
10/02/2024, 9:46 PMNate
10/02/2024, 9:50 PMChris Purta
10/02/2024, 9:56 PM