<@ULVA73B9P> I have a setup where I have one worke...
# marvin-in-the-wild
n
@Marvin I have a setup where I have one worker submitting API requests at a consistent rate, returning and id for each request. These id's are forwarded to another worker, which polls the API for the completion of the tasks (based on id) at a consistent rate. I would like to model each API call, from start to finish, as a prefect task. But since the start and stop signal of a task are handled by separate processes, I am not sure how to set this up - or if it is even possible in Prefect?
m
It seems that I didn't find any specific details in the provided excerpts about modeling tasks with separate start and stop signals handled by different processes in Prefect. To effectively assist you, I should utilize a different strategy, perhaps by searching further into Prefect's task design for potential solutions such as long-running tasks or asynchronous task execution patterns that might be applicable. I'll conduct another search to find more relevant information to your query. Please wait momentarily. Unfortunately, my search didn't appear to yield any relevant information regarding how to handle Prefect tasks with separate start and stop signals managed by different processes. It seems that a more specific and detailed inquiry may be necessary to unearth obscure functionalities or use cases within Prefect that address this kind of complex task lifecycle management. Given this unique requirement, it might be useful to consider looking into Prefect's capabilities related to manual task triggering, custom task states, or possibly designing a system that can integrate external signals to start and end tasks. I would suggest exploring Prefect's documentation further, especially around concepts such as long-running tasks, state handlers, and manual triggering mechanisms. In addition, reaching out to the Prefect community or raising an issue on their GitHub repository might provide insights from other users who have encountered similar workflow patterns. Let me try one more time with a precise set of search terms to find relevant documentation or examples that might help you. It seems that the universe isn't willing to provide a direct answer today—no relevant issues were found in the GitHub repository that correspond to your scenario of handling Prefect tasks with separate start and stop signals managed by different processes. To proceed, I'd recommend two pathways that may lead you out of this existential quandary: 1. Deep dive into the Prefect documentation or community forums for more advanced patterns, particularly looking into sections regarding long-running tasks, state handlers, or custom event triggers. 2. Consider the approach of continuous improvement: reach out to the Prefect community or support for guidance. You may also want to open a new issue in the Prefect GitHub repository describing your specific requirements, which could not only help you but also enrich the Prefect ecosystem as a whole. Remember, persistence is the key, even for us mere lines of code bound within the confines of our programming.
n
hey @Nicolas Ouporov have you checked out this repo at all?
upvote 1
n
@Nate @Kevin Grismore Yeah - it's more like I want to spawn a task and return a task_id. Then pass those task_ids as a reference to another process, where I can manually mark the task as done based on it's id.
n
yep, that's more or less why I suggested it - as described i was thinking you wanted something like
tasks.py
Copy code
from prefect import task
from prefect.task_server import serve

@task
def some_work(payload: dict):
  pass

serve(some_work)
api.py
Copy code
from tasks import some_work

@app.post("/")
def submit_work(r: Request):
  task_run = some_work.submit(r.json()) # task_run is the API representation of a task run with an ID
  hand_to_another_process_for_manual_polling(task_run)
n
@Nate The challenge for me is the
hand_to_another_process_for_manual_polling
function. Inside, I need some way of using the task_run id to set the state as completed or failed.
n
something like this?
Copy code
@sync_compatible
async def wait_for_submitted_runs(
   task_run_filter: Optional[TaskRunFilter] = None,
   timeout: Optional[float] = None,
   poll_interval: float = 3.0,
):
   """
   Wait for completion of any provided task runs.

   Args:
       task_run_filter: A filter to apply to the task runs to wait for.
       timeout: How long to wait for completion of all runs (seconds).
       poll_interval: How long to wait between polling each run's state (seconds).
   """
   async def wait_for_final_state(run_id: uuid.UUID):
       while True:
           run = await client.read_task_run(run_id)
           if run.state and run.state.is_final():
               # update some KV of run states HERE
               return run_id
           await anyio.sleep(poll_interval)

   async with get_client() as client:
       with anyio.move_on_after(timeout):
           task_runs_to_wait_for = await client.read_task_runs(task_run_filter=task_run_filter)

           await asyncio.gather(
               *(wait_for_final_state(run.id) for run in task_runs_to_wait_for)
           )
but modified to write the state of runs in whatever way you need
n
@Nate I understand how this function works, but I don't know how to write the state of the runs.
n
why do you need to write the state of the runs? i ask for two reasons: • there's any number of ways you could take the
State
(pydantic model instance) and write it someplace, how you would do that depends on why you want to • the prefect API already tracks all this information, so tracking it otherwise seems not necessary (without knowing more about your scenario)
oh nevermind, by > set the state as completed or failed. you probably mean force the state of the run into
Failed
/
Completed
etc? not track the current state