Hi, my name is mark and I am new to the prefect co...
# ask-community
m
Hi, my name is mark and I am new to the prefect community. We are are trying to build a dynamic scheduler for tasks of a flow (so depending on available resources, certain tasks should be executed at a defined time determined by our scheduler). We are planning to implement a BaseWorker that processes the (prefect) flow. What I still need to understand is, how individual tasks of a flow are processed by the BaseWorker. Are they somehow broken down into a Workflow Graph ? Where in the code is this happening ? Thanks a lot for your help πŸ˜ƒ !
n
hi @mark doerr! tldr: workers don't need to care about tasks in a flow they only prepare the entrypoint flow of a deployment
once you're in the entrypoint everything else just happens like it would if you ran the flow locally
so implementing
prepare_flow_run
in your subclass of
BaseWorker
is all you have to do (besides the
JobConfiguration
for your worker and the rest of the normal worker things)
this custom worker impl might be useful for you
m
Hi @Nate, thanks for the fast replies and nice to meet you here "semi-life" (I just saw your two YouTube Prefect intro videos last night - and I like them, since they give some useful insights ☺️ ) ... and now we are chatting. Thanks for pointing to another worker implementation (next to the ones in the prefect repo). That definitely helps. Could you please guide me to the location of the prefect code, where the individual tasks of a flow are executed (be it remotely or locally) - we try to understand, how a task in a flow is actually dispatched to the python executor - and how the execution of each task (flow step) is then controlled / monitored by prefect. Thanks a lot for your help.
Just to give you all a little bit of background info: we are running this robotic platform

https://www.youtube.com/watch?v=PmgcektmndEβ–Ύ

and want to use prefect to not only schedule, when a workflow on the machine is started (flow-level scheduling), but we also need to schedule, when each process step (which corresponds to a prefect task) is executed. For calculating the optimal order in which the steps are executed, since resources are limited by the capacity of the devices, we developed an open source (task) scheduler: https://gitlab.com/opensourcelab/pythonlabscheduler/-/tree/develop?ref_type=heads . Our goal is to "plug" this task-level scheduler framework into the prefect flow execution framework, to have a much more granular control over the execution of the workflow (and each individual task = robot-workflow-step). We chose prefect for that because of the intuitive decorator syntax and the whole monitoring and reporting capabilities.
amaze 3
b
Could you please guide me to the location of the prefect code, where the individual tasks of a flow are executed (be it remotely or locally)
Hi Mark! If you're looking for the Prefect code, all of that can be found in this github repo.
What you're working on is so cool. Is it for proteomics research? I noticed that the robotic arm is Thermofisher brand. Does it have an API which you plan on interacting with to initiate the workflows?
upvote 1
n
just piggybacking on bianca's response: oh wow! that's really cool (πŸ‘€ excited to hear more) as for where exactly the task orchestration occurs, its in `src/prefect/task_engine.py` , which may be sort of hard to parse right away but the tldr is that we: β€’ setup the task run context β€’ run the user code (i.e.
some_task.fn()
) β€’ send updates to the API when the task changes state (completes, retries, caches, crashes etc)
prefect spin 2
wizard2 1
πŸ‘ 1
m
@Bianca Hoch - thanks πŸ™‚ - it is protein engineering (a step before proteomics, but we could also do preparations for proteomics, but do not have fancy MS at our facility). Regarding the Robot Arm: we wrapped the Thermo Moverframework in a SiLA client (sila-standard.org, https://gitlab.com/sila2 ) - we did that with every device in the platform - and can now address them universally via the open source SiLA abstraction (and python - and hopefully soon with prefect πŸ˜‰
πŸš€ 1
🎊 1
Great @Nate, for pointing at the right place! This was, I was looking for ! (a little bit hidden for a newbe πŸ˜‰ . I am happy, that you like the idea. When I make some progress, I will let you know....Thanks again.
catjam 1
n
a little bit hidden for a newbe πŸ˜‰
yep! this is definitely on the low-level side of the SDK, its the "backend" of the
@task
and
@flow
decorators that connect your functions to the API, something that users don't often have to directly engage with a couple callouts that may save some time: β€’ the
flow
decorator accepts a
task_runner
kwarg which can accept any
TaskRunner
implementation β—¦
ConcurrentTaskRunner
- great option (its the default task runner), its really just
ThreadPoolExecutor
under the hood, but with nice syntactic sugar (submit / map methods) β—¦
DaskTaskRunner
- requires
pip install prefect-dask
but allows you to use an ephemeral or long-lived Dask cluster to send your tasks to (the syntactic sugar remains the same, still just submit / map)
m
πŸ‘ That's very good to know - you are so helpful @Nate 😎