Ross Leung
09/14/2023, 10:48 PM2.8.6
and deployed a flow with this pseudo code:
@flow(flow_run_name="Flow run for {final_date}") # I want the name of MainFlow() to be the final_date, but I can't figure out how.
def MainFlow(preliminary_date):
final_date = preliminary_date - 2 # final_date is calculated in the function logic
SubFlows(date=final_date) # I have no problem naming my SubFlows() with the final date, but I also want it for my MainFlow()
In the example, obviously the {final_date}
will throw an error, but ideally I want the flow_run_name
to be based on some logic that depends on the preliminary_date
that is fed into the MainFlow()
. Right now, my MainFlow()
just has two random words as names, while my SubFlows()
have meaningful names based on final_date
.
Any tips will be appreciated!Nate
09/15/2023, 2:12 AMobviously theI believe you're actually very close already to your desired behavior! flows will render a template string if you pass it towill throw an error{final_date}
flow_run_name
(or task_run_name
or result_storage_key
)
In [1]: from datetime import datetime
In [2]: from prefect import flow
In [3]: @flow(flow_run_name="Flow run for {final_date}")
...: def main_flow(final_date: str):
...: # call your subflows as needed
...: pass
...:
In [4]: main_flow(final_date=str(datetime.now()))
Ross Leung
09/15/2023, 2:14 AMNate
09/15/2023, 2:17 AMfinal_date
is a named argument of my flow, but your main flow expects preliminary_date
so even if you're passing params into your flow run from somewhere else (like api call, run_deployment) then it will still work
alternatively, if you need preliminary_date
to be the kwarg of your flow, flow_run_name
will accept a callable so you can do logic on your kwargs and the return will end up as the value of flow_run_name
does that answer your question?Ross Leung
09/15/2023, 2:23 AMNate
09/15/2023, 2:35 AMflow_run_name
and accessing runtime from within that callable to return your modified datetime but one way you can definitely do it is by specifying a model for your input param like
In [1]: from pydantic import BaseModel, validator
In [2]: from datetime import datetime, timedelta
...:
...: class DynamicDateTime(BaseModel):
...: dt: datetime
...:
...: @validator("dt", pre=True)
...: def modify_dt(cls, v):
...: return v + timedelta(days=42)
...:
In [3]: from prefect import flow
In [4]: @flow(log_prints=True)
...: def foo(dt: DynamicDateTime):
...: print(dt)
...:
In [5]: foo({"dt": datetime.now()})
21:34:43.863 | INFO | prefect.engine - Created flow run 'persimmon-nightingale' for flow 'foo'
21:34:44.333 | INFO | Flow run 'persimmon-nightingale' - dt=datetime.datetime(2023, 10, 26, 21, 34, 42, 946348)
21:34:44.480 | INFO | Flow run 'persimmon-nightingale' - Finished in state Completed()
where prefect will coerce the dict into the model (and run the validator) before entering your flow contextRoss Leung
09/15/2023, 2:37 AM