yourstake
10/21/2024, 4:48 PMMarvin
10/21/2024, 4:49 PMNate
10/21/2024, 4:49 PMNate
10/21/2024, 4:50 PMserve
some tasks and then .delay
them from elsewhere to run them
its like redis streams or celeryyourstake
10/21/2024, 4:52 PMNate
10/21/2024, 4:52 PMNate
10/21/2024, 4:52 PMyourstake
10/21/2024, 4:55 PMNate
10/21/2024, 4:55 PMNate
10/21/2024, 4:56 PMyourstake
10/21/2024, 5:06 PMNate
10/21/2024, 5:10 PMyourstake
10/21/2024, 5:15 PMNate
10/21/2024, 5:17 PMyourstake
10/21/2024, 5:23 PMNate
10/21/2024, 5:24 PMyourstake
10/21/2024, 5:26 PMyourstake
10/21/2024, 5:26 PMNate
10/21/2024, 5:27 PMyourstake
10/21/2024, 5:28 PMyourstake
10/21/2024, 5:29 PMyourstake
11/12/2024, 12:57 AMyourstake
11/12/2024, 12:59 AMNate
11/12/2024, 1:00 AMyourstake
11/12/2024, 1:01 AMyourstake
11/12/2024, 1:02 AMNate
11/12/2024, 1:03 AMyourstake
11/12/2024, 1:03 AMyourstake
11/12/2024, 1:26 AMNate
11/12/2024, 1:47 AMyourstake
11/12/2024, 2:55 PMyourstake
11/12/2024, 5:58 PMNate
11/12/2024, 6:00 PM@flow
def lots_of_nested_tasks_and_flows():
pass
@task
def calls_a_bunch_of_things():
lots_of_nested_tasks_and_flows() # blocking, does all the side effects
calls_a_bunch_of_things.serve() # websockets, low latency
Nate
11/12/2024, 6:01 PMshould I assume that it is the case that the background tasks will only work for handling small tasks and not for a whole DAG-like flow, in the current production version of prefect 3 as of today?background tasks can call whatever, including flows
yourstake
11/12/2024, 6:39 PMNate
11/12/2024, 6:39 PMNate
11/12/2024, 6:40 PMPOST /create_flow_run_from_deployment
one way or another are you scheduling a run that a polling process would have to catchyourstake
11/12/2024, 6:40 PMNate
11/12/2024, 6:41 PMNate
11/12/2024, 6:42 PMNate
11/12/2024, 6:42 PM@flow
def handler(event, context): ...
yourstake
11/12/2024, 6:46 PMyourstake
11/12/2024, 6:47 PMNate
11/12/2024, 6:49 PMyourstake
11/12/2024, 6:49 PMyourstake
11/12/2024, 6:50 PMyourstake
11/12/2024, 6:57 PMyourstake
11/12/2024, 6:58 PMyourstake
11/12/2024, 7:28 PMAndrew Brookins
11/12/2024, 8:33 PMasync def watch_events():
async with get_events_subscriber(
filter=EventFilter(
event=EventNameFilter(
name=[event="prefect.flow-run.scheduled"],
),
resource=EventResourceFilter(
labels={"prefect.resource.name": "your-flow-name"}
)
) as events_subscriber:
async for event in events_subscriber:
# You'd have to retrieve the flow
print(event.model_dump_json(indent=2))
I didn't test that code, but it should be close. You'd need to get the flow run ID from the event and then read the flow run data from our API to resolve the parameters to arguments (linked an example of how we do this when we're actually running flows). Anyway, not a solution but it's on my mind!yourstake
11/13/2024, 7:46 PM@task # prefect decorator
@app.task # launch task via celery
def process document():
do a bunch of things
return results
I am thinking about trying that approach though I'm a little concerned about mismatch between celery + prefect for example with concurrencyyourstake
11/13/2024, 7:46 PMAndrew Brookins
11/14/2024, 12:18 AM