Sven Teresniak
11/23/2023, 12:51 PMflow_run_name
parameter of the @flow
annotation/function. But this I cannot use because when I write a function that generates a flow run I cannot access the run context via get_run_context()
(this method is somehow blocking indefinitely, instead of giving me a flow context or throwing an exception!). But I need the run context to extract the parameters and modify it when I use the flow_run_name
with a callable. I know that I can create a string template flow_run_name='flow {parametername}'
but that's not enough because we have optional parameters. When the flow run parameter parametername
is empty/None we generate a time specific default parameter (like "last hour" or "last day"). This way we generate batch jobs that could either run scheduled and always process the "default" data or get an explicit parameter for reprocessing. Therefore I'd like to a) access the given flow run parameters, b) modify it or set it to sensible defaults and then c) modify the flow run's name to something like flow-name {modified_parameter}
-- https://github.com/PrefectHQ/prefect/issues/7463 seems not to help in my case.
pseudo code:
@flow
def export(day_param: Optional[str] = None):
day: datetime.date = parse_date(day_param) or datetime.date.today()
set_current_flow_run_name( name = f'{FLOW_RUN} for {day}' ) # <-- this I need for day not day_param!
Is this possible? I'm using Prefect 2.12 at the moment.
This is blocking
def get_name() -> str:
ctx = get_run_context()
day: datetime.date = parse_date(ctx.flow_run.parameters.get("day_param", None)) or datetime.date.today() # <-- not working
return day
@flow(flow_run_name=get_name)
def export(day_param: Optional[str] = None):
…
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by