I've been looking at trying to integrate Prefect (2.x) with Object Oriented Programming and design p...
s

Sam Thomas

almost 3 years ago
I've been looking at trying to integrate Prefect (2.x) with Object Oriented Programming and design patterns. As discussed in an earlier thread https://app.slack.com/client/TL09B008Y/CL09KU1K7/thread/CL09KU1K7-1658744762.332369 it's not straight forward to integrate Tasks and Flows with classes. I've got something that works but I was wondering if there's a more elegant way In this example I'm making a process that gets a number and adds another number to it for simplicity
class Maker():
    def __init__(self, val=5):
        self._val=val
        
    def make_number(self):
        return self._val
    
    @property
    def make_number_task(self):
        return prefect.Task(self.make_number)
    
class Adder():
    def __init__(self, val=3):
        self._val=val

    def add_number(self, x):
        return x+self._val
    
    @property
    def add_number_task(self):
        return prefect.Task(self.add_number)
    
class Calculator():
    def __init__(self):
        self.maker = Maker()
        self.adder = Adder()

    def _flow(self):
        a = self.maker.make_number_task()
        b = self.adder.add_number_task(a)
        return b
    
    @property
    def flow(self):
        return prefect.Flow(self._flow)
This seems to work.
c = Calculator()
c.flow()
<Prefect messages>
8
c.maker._val=10
c.flow()
<Prefect messages>
13
I'm wondering if there's a better way of doing it. Wrapping class methods in prefect.flow or prefect.task doesn't work because it treats "self" like a required argument but the above seems to work.
Hey guys, A question about sub-flows concurrency basic control issue: I have the need to create a ...
s

Srul Pinkas

10 months ago
Hey guys, A question about sub-flows concurrency basic control issue: I have the need to create a flow to run sub-flow that are a bit heavy (on memory mostly, take ~10 min to run), and i need it with a few dozens different-inputs. i'm using prefect 2.20 I tried two approaches: - simply trigger each sub-flow (through
run_deployment
in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internal
httpx
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something? - create many sub-flows but run it through a managed concurrency-limit queue (say, 3 at each given time - once one is done another kicks in). since it's a flow and not a task i had to go through work-pools /work-queue limits configuration. the thing is that since it takes some time to get the actual machine (mostly for the first batch from cluster), during that time it seems there's an error for the sub-flow yet to be executed (-1 signal until later it succeeds). and due to that temp-failure, other sub-flows also start to run and i'm missing my 3-limit (so 6-7 are running in parallel) which is odd as it seems like a common glitch. I looked for a parameter that controls how much time to wait for the first sub-flow-submit failure to avoid this race-edge-case but the only parameter (
job_watch_timeout_seconds
) that the docs were promising about, was also described as: "Number of seconds to wait for each event emitted by a job before timing out" - so raising this to 1-2 minutes seems too important/risky. - The third fallback option i use is creating less but chubbier jobs - each running serially 10 different inputs - and running them in parallel. but that seems like a bad solution for such a platform that should take care of that.. Did i miss any simple way out of this? Thanks!