Constantino Schillebeeckx
08/21/2023, 4:33 PMFlow
so that we can ensure consistent behavior in all our flows when it comes to things like state_handlers, flow name, etc. What's the suggested approach for how to do that in 2.0 given that flows are now functions. At first glance, maybe doing a partial
of your flow
decorator would do the trick? I'm guessing others have asked though I can't seem to find any docs/resources on it.Nate
08/21/2023, 4:38 PM@flow(**settings)
and `with_options` which I would argue circumvents the need to try and subclass Flow
for the vast majority of cases
if you need some extra functionality for a given flow, I'd probably recommend a decoratorNate
08/21/2023, 4:40 PMConstantino Schillebeeckx
08/21/2023, 4:54 PMConstantino Schillebeeckx
08/21/2023, 4:55 PMNate
08/21/2023, 4:56 PMConstantino Schillebeeckx
08/24/2023, 5:26 PMclass CustomFlow(Flow):
def __init__(self, *args, alert_config, **kwargs):
self.alert_config = alert_config
...
I'm wondering how we could do something similar in 2.0 - perhaps I can stuff that "alert configuration" into the Flow context?Nate
08/24/2023, 5:28 PMNate
08/24/2023, 5:33 PMFlow
class is quite complexConstantino Schillebeeckx
08/24/2023, 5:43 PMFlow
- I guess I'm trying to wrap my head around how I can extend a particular attribute of the Flow
with e.g. a configuration that's specific to our Org. I'd like to do something like:
def handler(flow, flow_run, state):
if flow_run.custom_config.is_something_custom():
do something
else where in a task:
@task
def a_task():
if flow_run.custom_config.something_else: # not sure how to get this or if it's possible
...
Nate
08/24/2023, 5:46 PM@flow
def foo():
pass
if some_condition:
foo = foo.with_options(retries=1)
else:
foo = foo.with_options(retries=42)
retries
just being one example, you can set any of the flow settings with with_options
a new instance of the flow object is returned, which you can then call as needed. so for example, you could alter a flow within the context of another, use with_options
to decide based on runtime information how the flow should be, and then call it as a subflowNate
08/24/2023, 5:47 PMextend a particular attribute of thewith e.g. a configuration that's specific to our OrgFlow
Constantino Schillebeeckx
08/24/2023, 5:51 PMdef handler(flow, flow_run, state):
or can I do
def handler(flow, flow_run, state, foo, bar):
or is it
def handler(flow, flow_run, state, **kwargs):
Nate
08/24/2023, 6:00 PMConstantino Schillebeeckx
08/24/2023, 6:53 PM# config.py
slack_channels = {"DE": "some_id", "ML": "some_id"}
and let's say I want all flows to have a slack_on_flow_failure
event_handler:
# handlers.py
def handle_fail(flow, flow_run, state):
if state.is_failed:
send_slack_to_team_specific_channel()
in an ideal world 🤩 I could define flows like:
import config
import handlers
@flow(alerts=config.slack_channels, on_failure=[handlers.handle_fail])
def ml_flow():
pass
@flow(alerts=config.slack_channels, on_failure=[handlers.handle_fail])
def de_flow():
pass
What I like about the above is that all flows (independent of team) are configured in the same way. But it sounds like instead I have to do something like
import config
import handlers
@flow(on_failure=[handlers.handle_fail_specific_for_ml])
def ml_flow():
pass
@flow(on_failure=[handlers.handle_fail_specific_for_ml])
def de_flow():
pass
and I would have to define nearly duplicate handlers that do the same thing but are team specific?Nate
08/24/2023, 6:55 PMConstantino Schillebeeckx
08/24/2023, 7:03 PMdef handler(flow: Flow, old_state: State, new_state: State):
if new_state.failed:
send_slack(flow.custom_alert_config)
I'm at a loss for how to get custom context or attributes in a state handler or a flow in 2.0Nate
08/24/2023, 7:04 PMConstantino Schillebeeckx
08/24/2023, 7:04 PMConstantino Schillebeeckx
08/24/2023, 7:05 PMNate
08/24/2023, 7:11 PMdict
field on a block
class SlackAlertConfig(Block):
_block_type_slug = "slack-alert-config"
config: dict
and then
def send_alert(flow, flow_run, state):
team_name = flow_run.parameters.get("team_name")
send_slack(Block.load(f"slack-alert-config/{team_name}").config)
assuming you pass in your team_name as a flow run param, but you might wanna get it from somewhere elseNate
08/24/2023, 7:12 PMNate
08/24/2023, 7:13 PMdict
and a method that loads the appropriate teams config by idConstantino Schillebeeckx
08/24/2023, 7:29 PMteam
key could get me what I want. though I'm still left pinning for an easier way to, for example, at a later date require that all flows have some other required parameter. in essence, I want an easier way to define new flow attributes/functionality that automatically gets inherited by all my org's flows without needed to touch all those flows individually since they (like in 1.0) are using my own subclass.
I don't think we have to belabor this discussion further; thanks for all the insight @Nate!