Daniel Mak
10/19/2022, 8:20 AMdate
into my flow as I would need to use that as a parameter for my pipeline?
E.g
runs on Wed 19Oct 2022 23:00. the date parameter 19Oct2022
is passed a parameter into the flow and will be used in the ETL scriptRyan Peden
10/19/2022, 11:52 AMimport pendulum
from prefect import flow, get_run_logger
@flow(name="my flow")
def my_flow(start_date=f"{pendulum.today('UTC'):%d%b%Y}"):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running on {start_date}")
if __name__ == "__main__":
my_flow()
If you run that, you'll see output similar to:
06:59:35.716 | INFO | prefect.engine - Created flow run 'agate-marten' for flow 'my flow'
06:59:35.813 | INFO | Flow run 'agate-marten' - Running on 19Oct2022
06:59:35.830 | INFO | Flow run 'agate-marten' - Finished in state Completed()
Pendulum is one of Prefect's dependencies, so you wouldn't need to install anything extra. You can also do the same thing with Python's built-in datetime
, but it would be a bit more verbose.
Using a parameter with a default value this way gives you the ability to override the parameter if you ever need to, but provides a sensible default that should (hopefully) give what you need most of the time. You could then pass the date into tasks and subflows as needed for your ETL pipeline.
You can also read both the expected and actual start dates from the flow run context, like so:
from prefect import flow, get_run_logger
from prefect.context import get_run_context
@flow(name="my flow")
def my_flow():
logger = get_run_logger()
context = get_run_context()
expected_start_date = f"{context.flow_run.expected_start_time:%d%b%Y}"
actual_start_date = f"{context.flow_run.start_time:%d%b%Y}"
<http://logger.info|logger.info>(f"Expected to run on {expected_start_date}")
<http://logger.info|logger.info>(f"Actually running on {actual_start_date}")
if __name__ == "__main__":
my_flow()
Which results in output like:
07:13:32.949 | INFO | prefect.engine - Created flow run 'bald-heron' for flow 'my flow'
07:13:33.059 | INFO | Flow run 'bald-heron' - Expected to run on 19Oct2022
07:13:33.059 | INFO | Flow run 'bald-heron' - Actually running on 19Oct2022
07:13:33.075 | INFO | Flow run 'bald-heron' - Finished in state Completed()
Will either of these help you accomplish your goal?Daniel Mak
10/20/2022, 2:17 AMif __name__ == "__main__":
<i pass in my parameters from a config file>
today_date_str = kwargs.get("today_date_str")
query_parameters = get_query_parameters(config, **kwargs)
<execute flow here>
Ryan Peden
10/20/2022, 1:23 PM@flow(validate_parameters=False)
def my_flow(today_date_str: str, query_parameters: dict[str,str]):
...
Or if you'd like to keep the validation but mark a params as optional to keep Pydantic happy:
from typing import Optional
@flow(validate_parameters=False)
def my_flow(today_date_str: Optional[str], query_parameters: Optional[dict[str,str]]):
...
That way, you're not forced to use the parameters but still get validation, so if for example you tried to call the flow with an int
as your today_date_str
, validation would fail.