Is there a way to create flows in prefect 2.0 from...
# prefect-community
b
Is there a way to create flows in prefect 2.0 from a class function?
Copy code
from prefect import flow

class MyClass:
    def __init__(self, ct: int=0):
        self.count = ct
        
    @flow
    def runit(self):
        print(f"I have been called {self.count} times")
        self.count += 1
        
        
c = MyClass()
c.runit()
I get the error
Copy code
TypeError: missing a required argument: 'self'
Is that expected, or a potential bug in orion?
k
Hi @Ben Epstein, good to see you here. I am not sure this is a bug because this implies that you have state that you are holding across flow runs stored in
self
, which is something that needs to be explicitly designed for. When deployed, the flow runs as another Python process that spins up for execution and then spins down, so holding this state in production is problematic and that state really needs to be persisted somewhere. In this small example, it’s tempting to think that you can do:
Copy code
c = MyClass()
c.runit()
c.runit()
but when you bring it to a deployment, these
runit
calls are different processes. So unless someone corrects me, I think the best you can do is treat
runit
as a
classmethod
so it doesn’t have to take
self
if you want to attach it to a class, but anything shared between flow runs would need to be explicitly returned or persisted. If this were using
self
on the task level, that would make more sense because the state would be confined to a Flow run. Not sure if you can do that yet in Orion. It was just added in Prefect 1
👋 1
a
I think the best you can do is treat
runit
as a
classmethod
Actually, your best option is to not use a class for that purpose at all.
@flow
decorator is supposed to be used with a function. When you create a
DeploymentSpec
later to deploy your flow, you need it as a function, not a class method. Ben, could you explain what problem do you try to solve this way? Why would you want to put it into a class in the first place?
b
Thanks for the tips @Anna Geller @Kevin Kho I set it up this way as a convenience to try to change as little code as possible to utilize prefect. I already have a class that encapsulates the logic of my pipeline (flow), and it has a bunch of helper functions that are effectively tasks. I can change this to an outer function that instantiats the class and calls my
run
function. I'm curious if I can still assign my class functions to tasks for more granular control. This seems like a pretty reasonable use-case, though. Imagine you create, for example, an sklearn `Pipeline`that has many
stages
. You'd want the pipeline to be your `flow`and each stage to be a
task
. Does that make sense? I tried out what I wrote above, instantiating my class in an outer function decorated with
flow
(no tasks for now) but it was failing with a
MissingContextError
- I'm not by my machine anymore but when I get back I'll share a trace. Do you have any experience around that? My flow is running in a grpc server (so a forked python process). Does that matter?
a
Sklearn pipeline is a solution to a related problem as your flow - ensuring that specific stages of your ML workflow run in the proper order and that you can call this single object (Prefect flow or sklearn pipeline) to trigger the entire workflow. So I wouldn't use sklearn Pipeline if you are already using Prefect since with a Prefect flow, you get more observability and orchestration features. The problem with classes is that they are stateful. You can read more here
b
@Anna Geller thanks for that - do you have any ideas around the ContextError?
m
I wouldn't use sklearn Pipeline if you are already using Prefect since with a Prefect flow, you get more observability and orchestration features.
I agree with the point you want to make, but I also believe you can combine Prefect with sklearn pipelines. I use sklearn pipelines as a convenient way to serialize a preprocessing pipeline with a trained model, e.g when you perform scaling/normalization, you want to store basic statistics (min/max/mean/std) of features to perform scaling/normalization during inference. So I usually create a flow where you fit a scaler, train a model on scaled data and then combine the scaler and fitted model into a sklearn pipeline to serialize the object (e.g with MLflow
log_model
). But perhaps there are better alternatives…
a
you can combine Prefect with sklearn pipelines.
Absolutely! There are trade-offs with both. There is no right or wrong here, it depends on your use case and how much visibility you need. Thanks for sharing how you do it, Matthias! 🙌