Hi! I'm exploring prefect for genomics on a kubern...
# prefect-community
m
Hi! I'm exploring prefect for genomics on a kubernetes cluster. Unforunately, genomics involves a lot of outside tooling which needs to be called from the shell. It looks like both prefect and dask have poor support for managing files and shell applications as they are designed for in-memory python datasets. Additionally, it appears that there's not a straightforward way to specify containers for specific tasks (e.g. we have a container with tooling installed) or to create file system resource requirements for a worker. Just want to check in if my understanding is correct or if prefect could be extended to this use-case.
c
Hi Michael - most users use some combination of: - shell tasks (https://docs.prefect.io/api/unreleased/tasks/shell.html) - kubernetes tasks (https://docs.prefect.io/api/unreleased/tasks/kubernetes.html) for managing shell / non-python based / containerized dependencies
j
@Michael Adkins in addition to what @Chris White mentioned, we've had success mounting a shared drive (using Amazon's EFS Elastic File System, but an NFS drive would work, too) on our workers (using Kubernetes) to give access to proprietary binary (.so) libraries that we call from Python code in our Prefect Task functions. You're welcome to DM me if more details would be helpful.
đź’Ż 1
m
Thanks for the references Chris, I had seen those. In my pipeline, I would want all my tasks to run in pods that tear down when they're finished so it seems like the ideal workflow would actually be a Task composed of 3 tasks, PodCreation, WorkTask, PodDeletion, but this kind of composability isn't readily apparent. Can I compose flows or tasks?
c
Gotcha; there isn’t a first class way but it is certainly possible. Here’s how you could do it (at a high level):
Copy code
from prefect import Task

class MyTask(Task):
    def __init__(self, task_a_kwargs, task_b_kwargs, task_c_kwargs, **kwargs):
        self.composed_tasks = [TaskA(**task_a_kwargs), TaskB(**task_b_kwargs)]
        super().__init__(**kwargs)
    def run(self):
        return_a = self.composed_tasks[0]()
        return_b = self.composed_tasks[1](return_a)
m
Ah okay, cool! Thanks. I wasn't sure if you could have task run calls within a Task.run. The nested futures won't be a problem?
c
It’s a good question - this pattern won’t actually create nested futures; the only true task your Flow will “see” is the
MyTask
task, and only it will be submitted to an executor (as a single future). While we are importing / calling other “tasks”, we’re directly calling their
run
methods in our logic, so effectively all we’re doing is just re-using their boilerplate code.
m
Ah okay. So I can't compose them and have it still teardown a pod if the runtime fails (in the prefect api sense), I'd have to try/except
c
You could do it that way, but I would recommend putting PodCreation + WorkTask together into a single task, and then have PodDeletion be downstream with an
always_run
trigger. I think in that case the Prefect states would better reflect your actual intent.
m
That makes sense
I've realized that this won't work as I was imagining. I want my
WorkTask
to run within the pod created by
PodCreation
but there's not really a clear way to do that
I think this is outside the scope of the API though
Would you be interested in a call tomorrow regarding use of your cloud product / prefect in general for my company's use case?
c
Yea that sounds great! And I do have some ideas about specifying one container per task, but I’d like to hear about your use case more before I make any promises on that end. Why don’t you send me an email at
<mailto:chris@prefect.io|chris@prefect.io>
and we can try to set something up