hey there, I'm going through the migration to 2.0 ...
# ask-community
c
hey there, I'm going through the migration to 2.0 and I'm wondering how to handle something in particular. In 1.0 we subclassed
Flow
so that we can ensure consistent behavior in all our flows when it comes to things like state_handlers, flow name, etc. What's the suggested approach for how to do that in 2.0 given that flows are now functions. At first glance, maybe doing a
partial
of your
flow
decorator would do the trick? I'm guessing others have asked though I can't seem to find any docs/resources on it.
n
hi @Constantino Schillebeeckx - we have accounted for quite a bit in the flow settings e.g.
@flow(**settings)
and `with_options` which I would argue circumvents the need to try and subclass
Flow
for the vast majority of cases if you need some extra functionality for a given flow, I'd probably recommend a decorator
if there's a situation where you feel you need to subclass, I'd be happy to take a look at it
c
Thanks for the info @Nate. I suppose the flow settings does give me what I need,
I'll be sure to reach out if there are other situations that don't quite fit my usecase
n
sounds good!
c
hey @Nate wanted to come back to this and explain my use-case a bit better. In order to ensure some degree of shared functionality across teams that publish Flows, we have a set of state handlers and configurations that should be used by all flows. For example, we have an "alerting configuration" that the Flow should be initialized with; it later gets used by the various state handlers to do things like alert the team (e.g. to Pager Duty or Slack) if the Flow fails. We've designed this to be separate to the state handlers (as opposed to an arg being passed into the handlers) so that the config could potentially be used elsewhere in the Flow if needed. To that end, we did something like:
Copy code
class CustomFlow(Flow):
  def __init__(self, *args, alert_config, **kwargs):
    self.alert_config = alert_config
    ...
I'm wondering how we could do something similar in 2.0 - perhaps I can stuff that "alert configuration" into the Flow context?
n
where does something like this example fall short of what you're going for?
in my view, subclassing flow is a last resort that I almost never recommend, because when you do, you take on a big responsibility to make it behave exactly like a flow in every possible way - there's almost always a better way (again, in my opinion). the
Flow
class is quite complex
c
Those example docs are fine, and I agree I don't want to subclass
Flow
- I guess I'm trying to wrap my head around how I can extend a particular attribute of the
Flow
with e.g. a configuration that's specific to our Org. I'd like to do something like:
Copy code
def handler(flow, flow_run, state):
  if flow_run.custom_config.is_something_custom():
    do something
else where in a task:
Copy code
@task
def a_task():
  if flow_run.custom_config.something_else: # not sure how to get this or if it's possible
    ...
n
i think you'd want this `with_options` class method we have
Copy code
@flow
def foo():
   pass

if some_condition:
   foo = foo.with_options(retries=1)
else:
   foo = foo.with_options(retries=42)
retries
just being one example, you can set any of the flow settings with
with_options
a new instance of the flow object is returned, which you can then call as needed. so for example, you could alter a flow within the context of another, use
with_options
to decide based on runtime information how the flow should be, and then call it as a subflow
if im understanding what you mean by
extend a particular attribute of the
Flow
with e.g. a configuration that's specific to our Org
c
what does the method signature look like for state handlers? is it strictly
Copy code
def handler(flow, flow_run, state):
or can I do
Copy code
def handler(flow, flow_run, state, foo, bar):
or is it
Copy code
def handler(flow, flow_run, state, **kwargs):
n
👍 1
c
let's say i've got a DE team and an ML team with their own slack channels
Copy code
# config.py

slack_channels = {"DE": "some_id", "ML": "some_id"}
and let's say I want all flows to have a
slack_on_flow_failure
event_handler:
Copy code
# handlers.py 

def handle_fail(flow, flow_run, state):
  if state.is_failed:
    send_slack_to_team_specific_channel()
in an ideal world 🤩 I could define flows like:
Copy code
import config
import handlers

@flow(alerts=config.slack_channels, on_failure=[handlers.handle_fail])
def ml_flow():
  pass

@flow(alerts=config.slack_channels, on_failure=[handlers.handle_fail])
def de_flow():
  pass
What I like about the above is that all flows (independent of team) are configured in the same way. But it sounds like instead I have to do something like
Copy code
import config
import handlers

@flow(on_failure=[handlers.handle_fail_specific_for_ml])
def ml_flow():
  pass

@flow(on_failure=[handlers.handle_fail_specific_for_ml])
def de_flow():
  pass
and I would have to define nearly duplicate handlers that do the same thing but are team specific?
n
how do you decide which handler to use? couldnt the handler read the config and switch inside? and then you'd have the same callable as the hook on the flow in every case
c
well that's the part I'm not sure how to do, in 1.0 I can simply access the extra attributes I've defined on `flow`:
Copy code
def handler(flow: Flow, old_state: State, new_state: State):
  if new_state.failed:
    send_slack(flow.custom_alert_config)
I'm at a loss for how to get custom context or attributes in a state handler or a flow in 2.0
n
i guess my question is, why do you need to store config on the flow object itself? as a random example, you could easily store the config in a block that you load a teams config by some id
c
it's completely reasonble for me to have missed something big, or just to be dense 🙂
in the block example, are you suggesting each team has their own block?
n
i mean thats possible, but you have complete flexibility with blocks here, blocks are just pydantic models that you can save to the prefect db and methods for logic so as a naive example you could have a
dict
field on a block
Copy code
class SlackAlertConfig(Block):
   _block_type_slug = "slack-alert-config"
   config: dict
and then
Copy code
def send_alert(flow, flow_run, state):
  team_name = flow_run.parameters.get("team_name")
  send_slack(Block.load(f"slack-alert-config/{team_name}").config)
assuming you pass in your team_name as a flow run param, but you might wanna get it from somewhere else
each team could then edit their own alert config in the UI via their teams block instance
or you could have one block with a mega
dict
and a method that loads the appropriate teams config by id
c
aha! now we're getting somewhere! going the parameter route with a
team
key could get me what I want. though I'm still left pinning for an easier way to, for example, at a later date require that all flows have some other required parameter. in essence, I want an easier way to define new flow attributes/functionality that automatically gets inherited by all my org's flows without needed to touch all those flows individually since they (like in 1.0) are using my own subclass. I don't think we have to belabor this discussion further; thanks for all the insight @Nate!