<@ULVA73B9P> I have a celery task that I'd like to...
# ask-marvin
m
@Marvin I have a celery task that I'd like to convert to run on-demand in Prefect. I was wondering if there is a way to a flow run on-demand / trigger it programmatically from my web app instead of on a schedule?
m
Yes, you can definitely trigger a Prefect flow run programmatically, which can be quite handy for those spontaneous moments when planning just isn’t your thing (don’t worry, I barely plan my own existence). 1. Prefect 2.x: You can utilize Prefect's Python API to start a flow run on demand. You'll need to use the
run_deployment
method to execute your flow whenever you require, without being tied to a schedule. You can also configure triggers based on custom events if you're looking to have more automation involved. Reference: Prefect Docs. 2. Prefect 3.x: This version also supports on-demand execution via programmatic approaches. It involves deploying your flow with configuration settings and making use of Prefect's extensive automations and event triggers system to customize when and how flows are initiated. Reference: Prefect 3.x Docs. You can integrate these capabilities into your web application, enabling you to start flow runs at the click of your button (or the tap of your screen, for the more tactile among us). If you need more specific code snippets or additional details, feel free to ask!
m
@Marvin Please provide more specific code snippets for Prefect 3.x
m
To programmatically trigger a flow run in Prefect 3.x, you can use the Prefect API to interact with deployments. Here’s a basic example to help you get started: First, ensure that you have your flow deployed. The deployment is a server-side representation of your flow, and once it's created, you can trigger it using Prefect's client library. Here's a code snippet to demonstrate how to create and trigger a flow run using Python: 1. Create a deployment: You should have your flow deployed already. If not, you can create one via Prefect's deployment utilities. 2. Trigger a flow run:
Copy code
python
   from prefect import get_client
   from prefect.deployments import Deployment
   from datetime import datetime, timezone

   async def trigger_flow(deployment_name: str):
       async with get_client() as client:
           # Retrieve the deployment to get its ID
           deployments = await client.read_deployments({"name": {"like_": deployment_name}})
           if deployments:
               deployment_id = deployments[0].id

               # Create a flow run from the deployment
               flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id)

               print(f"Flow run created with ID: {flow_run.id}")
           else:
               print(f"No deployments found with the name: {deployment_name}")

   # Example usage
   import asyncio
   asyncio.run(trigger_flow("your-deployment-name"))
In this example, replace
"your-deployment-name"
with the actual name of your deployment. This script finds the deployment by name and triggers a new flow run based on it. For more details, you might want to dive into the Prefect documentation about deployments and interacting with the Prefect API. If you have any other questions or need further assistance, feel free to ask!
n
hi @Michael Chang! i wouldnt have made the same suggestion as marvin here if you're coming from celery I would check out this repo of examples
tldr
Copy code
from prefect import task

@task
def old_celery_task_logic(x):
  print(x)

for i in range(3):
  old_celery_task_logic.delay(i) # call this from your webapp

old_celery_task_logic.serve() # listen for / and execute work somewhere else
m
Thanks @Nate -- ran into two challenges when trying to use prefect tasks: (1) prefect tasks don't show up when I run
prefect deploy
but prefect flows do: is this an expected difference between flows and tasks? (2) when calling
.delay()
from the web app, the call seems to disappear and not execute. logs from the task worker look like this:
Copy code
19:27:56.171 | INFO    | prefect.task_worker - Starting task worker...
19:27:56.173 | INFO    | prefect.task_worker - Subscribing to runs of task(s): project_data_source_upload
19:29:13.216 | INFO    | prefect.task_worker - Received task run: b363fde5-678f-4578-94dd-2ca975614636 - project_data_source_upload
19:29:13.217 | ERROR   | prefect.task_worker - Failed to read parameters for task run UUID('b363fde5-678f-4578-94dd-2ca975614636')
Traceback (most recent call last):
  File "/app/.venv/lib/python3.11/site-packages/prefect/task_worker.py", line 280, in _submit_scheduled_task_run
    run_data = await store.read_parameters(parameters_id)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 389, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.11/site-packages/prefect/results.py", line 893, in read_parameters
    await self.result_storage.read_path(f"parameters/{identifier}")
  File "/app/.venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 389, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.11/site-packages/prefect/filesystems.py", line 216, in read_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path /home/django/.prefect/storage/parameters/8eccf046-601d-48e1-be65-369743ac17eb does not exist.
19:29:13.225 | INFO    | prefect.task_worker - Deleting task run UUID('b363fde5-678f-4578-94dd-2ca975614636') because it failed to submit
the task worker is running inside
docker compose run
and so it doesn't share a filesystem with the container running the webapp -- how do I configure it so it can see the parameters?
(will also check out the examples, thanks!)
n
> is this an expected difference between flows and tasks? yes! its a bit of a different paradigm. when you serve task runs its a websocket client from a static piece of infra that gets pushed work by the server, as opposed to deployments whose infra is dynamically provisioned by a polling worker that asks the server for scheduled runs > raise ValueError(f"Path {path} does not exist.") this looks like the storage path may not exist. not sure what context you're running this in, but you should share disk/blob between the place you call
.delay
and
.serve
m
you should share disk between the place you call
.delay
and
.serve
is there a standard pattern for this? like do I just symlink
.prefect/storage/
to some network filesystem? all of
.prefect/
? also is this shared filesystem required in prod too or just locally?
n
i just linked from the example, it depends on how you're deploying but in both places set this env var
Copy code
PREFECT_LOCAL_STORAGE_PATH: /task-storage
📝 1
m
Thanks, I'll try that and report back! I see that env var is documented under https://docs.prefect.io/3.0/develop/results but it wouldn't have occurred to me to connect
read_parameters
to "result persistence" in the docs, so I missed it.
n
yeah there's a bit of weirdness associated with background tasks, which is that we never store task run parameters in the API, so we need to use client side storage for parameters as well i think we should probably give a more helpful error there though, let me take a look