Eddie Atkinson
08/23/2021, 5:06 AMmap
to process the data for each site individually, but does so for the entire range at once. The complication arises from the fact that the date range is specified as a parameter, and the site_ids are the result of a task that runs.Eddie Atkinson
08/23/2021, 5:07 AMwith Flow(
FLOW_NAME,
run_config=RUN_CONFIG,
storage=STORAGE,
state_handlers=[failure_handler, success_handler],
) as flow:
client_name = Parameter("client_name", default=DEFAULT_CLIENT_NAME)
start_date = Parameter("start_date", default=None)
end_date = Parameter("end_date", default=None)
force = Parameter("force", default=False)
start_date, end_date = set_date_params(start_date, end_date)
site_ids = get_site_ids(client_name)
fetch_raw_data_task = fetch_raw_data.map(
client_name=unmapped(client_name),
site_id=site_ids,
start_date=start_date,
end_date=end_date,
force=unmapped(force),
)
process_raw_data.map(
client_name=unmapped(client_name),
site_id=site_ids,
start_date=start_date,
end_date=end_date,
force=unmapped(force),
upstream_tasks=[fetch_raw_data_task],
)
schedule = Schedule(clocks=[CronClock("55 23 * * * *")])
flow.schedule = schedule
flow.register(project_name=PROJECT_NAME)
Eddie Atkinson
08/23/2021, 5:08 AMwith Flow(
FLOW_NAME,
run_config=RUN_CONFIG,
storage=STORAGE,
state_handlers=[failure_handler, success_handler],
) as flow:
client_name = Parameter("client_name", default=DEFAULT_CLIENT_NAME)
start_date = Parameter("start_date", default=None)
end_date = Parameter("end_date", default=None)
force = Parameter("force", default=False)
start_date, end_date = set_date_params(start_date, end_date)
site_ids = get_site_ids(client_name)
for date in pd.date_range(start_date, end_date):
fetch_raw_data_task = fetch_raw_data.map(
client_name=unmapped(client_name),
site_id=site_ids,
start_date=date,
end_date=date,
force=unmapped(force),
)
process_raw_data.map(
client_name=unmapped(client_name),
site_id=site_ids,
start_date=date,
end_date=date,
force=unmapped(force),
upstream_tasks=[fetch_raw_data_task],
)
schedule = Schedule(clocks=[CronClock("55 23 * * * *")])
flow.schedule = schedule
flow.register(project_name=PROJECT_NAME)
Eddie Atkinson
08/23/2021, 5:09 AMKevin Kho
task
output because these loops are evaluated during build time and the task
resolves to a value during runtime. Your suggestion is a good one I think. If you need it to be sequential, you can use the pd.date_range
code and for loop inside your fetch_raw_data
task instead, but I assume you don’t want to go that route because there is no observability for each date
,site_id
combination?Eddie Atkinson
08/23/2021, 5:18 AMdate
, site_id
combination so that my slack logs are as granular as possible (as well as enabling me to re-run data processing jobs for individual days). I will give my solution a crack, if anyone else has any suggestions I would love to hear themKevin Kho