Carter Kwon
04/15/2021, 7:28 PM<task functions... >
with Flow("ETL Flow", schedule=schedule, storage=Docker(registry_url=os.getenv("REGISTRY_URL"), image_name=os.getenv("IMAGE_NAME")), run_config=ECSRun(task_role_arn=os.getenv("TASK_ROLE_ARN"), execution_role_arn=os.getenv("EXECUTION_ROLE_ARN"))) as flow:
DAYS_AGO = 5
TARGET_DATE = (datetime.now() - timedelta(days=DAYS_AGO)).strftime('%Y-%m-%d')
<use TARGET_DATE to make API calls inside tasks... >
We have a CI/CD process in place that registers our flows after they've been pushed to git. For this particular flow, TARGET_DATE
should equal today's date - 5 days
because the API needs a few days for the analytics to be available. I've noticed that TARGET_DATE
actually ends up being date of flow registration - 5 days
. Is there a way to have this code executed every time the flow is run instead of once at registration so TARGET_DATE
changes every day?Zach Angell
TARGET_DATE = (datetime.now() - timedelta(days=DAYS_AGO)).strftime('%Y-%m-%d')
gets executed at registration time, not runtime.
You'll just want to wrap that method in a task, which will actually execute the code at runtime.
@task
def get_target_date():
return (datetime.now() - timedelta(days=DAYS_AGO)).strftime('%Y-%m-%d')
with Flow() :
......
TARGET_DATE = get_target_date()
Carter Kwon
04/15/2021, 7:40 PM