Ben Epstein
04/09/2022, 12:52 PMfrom prefect import flow
class MyClass:
def __init__(self, ct: int=0):
self.count = ct
@flow
def runit(self):
print(f"I have been called {self.count} times")
self.count += 1
c = MyClass()
c.runit()
I get the error
TypeError: missing a required argument: 'self'
Is that expected, or a potential bug in orion?Kevin Kho
04/09/2022, 3:17 PMself
, which is something that needs to be explicitly designed for.
When deployed, the flow runs as another Python process that spins up for execution and then spins down, so holding this state in production is problematic and that state really needs to be persisted somewhere. In this small example, it’s tempting to think that you can do:
c = MyClass()
c.runit()
c.runit()
but when you bring it to a deployment, these runit
calls are different processes.
So unless someone corrects me, I think the best you can do is treat runit
as a classmethod
so it doesn’t have to take self
if you want to attach it to a class, but anything shared between flow runs would need to be explicitly returned or persisted.
If this were using self
on the task level, that would make more sense because the state would be confined to a Flow run. Not sure if you can do that yet in Orion. It was just added in Prefect 1Anna Geller
04/09/2022, 5:14 PMI think the best you can do is treatActually, your best option is to not use a class for that purpose at all.as arunit
classmethod
@flow
decorator is supposed to be used with a function. When you create a DeploymentSpec
later to deploy your flow, you need it as a function, not a class method.
Ben, could you explain what problem do you try to solve this way? Why would you want to put it into a class in the first place?Ben Epstein
04/09/2022, 6:30 PMrun
function. I'm curious if I can still assign my class functions to tasks for more granular control.
This seems like a pretty reasonable use-case, though. Imagine you create, for example, an sklearn `Pipeline`that has many stages
. You'd want the pipeline to be your `flow`and each stage to be a task
. Does that make sense?
I tried out what I wrote above, instantiating my class in an outer function decorated with flow
(no tasks for now) but it was failing with a MissingContextError
- I'm not by my machine anymore but when I get back I'll share a trace.
Do you have any experience around that? My flow is running in a grpc server (so a forked python process). Does that matter?Anna Geller
04/09/2022, 6:43 PMBen Epstein
04/09/2022, 7:15 PMMatthias
04/09/2022, 8:29 PMI wouldn't use sklearn Pipeline if you are already using Prefect since with a Prefect flow, you get more observability and orchestration features.I agree with the point you want to make, but I also believe you can combine Prefect with sklearn pipelines. I use sklearn pipelines as a convenient way to serialize a preprocessing pipeline with a trained model, e.g when you perform scaling/normalization, you want to store basic statistics (min/max/mean/std) of features to perform scaling/normalization during inference. So I usually create a flow where you fit a scaler, train a model on scaled data and then combine the scaler and fitted model into a sklearn pipeline to serialize the object (e.g with MLflow
log_model
). But perhaps there are better alternatives…Anna Geller
04/09/2022, 10:35 PMyou can combine Prefect with sklearn pipelines.Absolutely! There are trade-offs with both. There is no right or wrong here, it depends on your use case and how much visibility you need. Thanks for sharing how you do it, Matthias! 🙌