Michael Chang
11/18/2024, 7:43 PMMarvin
11/18/2024, 7:44 PMrun_deployment
method to execute your flow whenever you require, without being tied to a schedule. You can also configure triggers based on custom events if you're looking to have more automation involved. Reference: Prefect Docs.
2. Prefect 3.x: This version also supports on-demand execution via programmatic approaches. It involves deploying your flow with configuration settings and making use of Prefect's extensive automations and event triggers system to customize when and how flows are initiated. Reference: Prefect 3.x Docs.
You can integrate these capabilities into your web application, enabling you to start flow runs at the click of your button (or the tap of your screen, for the more tactile among us).
If you need more specific code snippets or additional details, feel free to ask!Michael Chang
11/18/2024, 8:11 PMMarvin
11/18/2024, 8:11 PMpython
from prefect import get_client
from prefect.deployments import Deployment
from datetime import datetime, timezone
async def trigger_flow(deployment_name: str):
async with get_client() as client:
# Retrieve the deployment to get its ID
deployments = await client.read_deployments({"name": {"like_": deployment_name}})
if deployments:
deployment_id = deployments[0].id
# Create a flow run from the deployment
flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id)
print(f"Flow run created with ID: {flow_run.id}")
else:
print(f"No deployments found with the name: {deployment_name}")
# Example usage
import asyncio
asyncio.run(trigger_flow("your-deployment-name"))
In this example, replace "your-deployment-name"
with the actual name of your deployment. This script finds the deployment by name and triggers a new flow run based on it.
For more details, you might want to dive into the Prefect documentation about deployments and interacting with the Prefect API.
If you have any other questions or need further assistance, feel free to ask!Nate
11/18/2024, 8:23 PMNate
11/18/2024, 8:25 PMfrom prefect import task
@task
def old_celery_task_logic(x):
print(x)
for i in range(3):
old_celery_task_logic.delay(i) # call this from your webapp
old_celery_task_logic.serve() # listen for / and execute work somewhere else
Michael Chang
11/18/2024, 8:30 PMprefect deploy
but prefect flows do: is this an expected difference between flows and tasks?
(2) when calling .delay()
from the web app, the call seems to disappear and not execute. logs from the task worker look like this:
19:27:56.171 | INFO | prefect.task_worker - Starting task worker...
19:27:56.173 | INFO | prefect.task_worker - Subscribing to runs of task(s): project_data_source_upload
19:29:13.216 | INFO | prefect.task_worker - Received task run: b363fde5-678f-4578-94dd-2ca975614636 - project_data_source_upload
19:29:13.217 | ERROR | prefect.task_worker - Failed to read parameters for task run UUID('b363fde5-678f-4578-94dd-2ca975614636')
Traceback (most recent call last):
File "/app/.venv/lib/python3.11/site-packages/prefect/task_worker.py", line 280, in _submit_scheduled_task_run
run_data = await store.read_parameters(parameters_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 389, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.11/site-packages/prefect/results.py", line 893, in read_parameters
await self.result_storage.read_path(f"parameters/{identifier}")
File "/app/.venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 389, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.11/site-packages/prefect/filesystems.py", line 216, in read_path
raise ValueError(f"Path {path} does not exist.")
ValueError: Path /home/django/.prefect/storage/parameters/8eccf046-601d-48e1-be65-369743ac17eb does not exist.
19:29:13.225 | INFO | prefect.task_worker - Deleting task run UUID('b363fde5-678f-4578-94dd-2ca975614636') because it failed to submit
the task worker is running inside docker compose run
and so it doesn't share a filesystem with the container running the webapp -- how do I configure it so it can see the parameters?Michael Chang
11/18/2024, 8:31 PMNate
11/18/2024, 8:32 PM.delay
and .serve
Michael Chang
11/18/2024, 8:34 PMyou should share disk between the place you callis there a standard pattern for this? like do I just symlinkand.delay
.serve
.prefect/storage/
to some network filesystem? all of .prefect/
?
also is this shared filesystem required in prod too or just locally?Nate
11/18/2024, 8:35 PMPREFECT_LOCAL_STORAGE_PATH: /task-storage
Michael Chang
11/18/2024, 8:37 PMread_parameters
to "result persistence" in the docs, so I missed it.Nate
11/18/2024, 8:38 PM