Hey folks, Haven’t been able to figure this one o...
# ask-community
e
Hey folks, Haven’t been able to figure this one out by reading the docs / threads here. I am trying to process data for sites in a date range one day at a time (with a task running for each combination of date and site_id). My current solution uses
map
to process the data for each site individually, but does so for the entire range at once. The complication arises from the fact that the date range is specified as a parameter, and the site_ids are the result of a task that runs.
This is the code as it currently is:
Copy code
with Flow(
    FLOW_NAME,
    run_config=RUN_CONFIG,
    storage=STORAGE,
    state_handlers=[failure_handler, success_handler],
) as flow:
    client_name = Parameter("client_name", default=DEFAULT_CLIENT_NAME)
    start_date = Parameter("start_date", default=None)
    end_date = Parameter("end_date", default=None)
    force = Parameter("force", default=False)

    start_date, end_date = set_date_params(start_date, end_date)

    site_ids = get_site_ids(client_name)

    fetch_raw_data_task = fetch_raw_data.map(
        client_name=unmapped(client_name),
        site_id=site_ids,
        start_date=start_date,
        end_date=end_date,
        force=unmapped(force),
    )

    process_raw_data.map(
        client_name=unmapped(client_name),
        site_id=site_ids,
        start_date=start_date,
        end_date=end_date,
        force=unmapped(force),
        upstream_tasks=[fetch_raw_data_task],
    )


schedule = Schedule(clocks=[CronClock("55 23 * * * *")])

flow.schedule = schedule
flow.register(project_name=PROJECT_NAME)
Hopefully this code demonstrates the logic I want to implement (though I am aware that it won’t work as written)
Copy code
with Flow(
    FLOW_NAME,
    run_config=RUN_CONFIG,
    storage=STORAGE,
    state_handlers=[failure_handler, success_handler],
) as flow:
    client_name = Parameter("client_name", default=DEFAULT_CLIENT_NAME)
    start_date = Parameter("start_date", default=None)
    end_date = Parameter("end_date", default=None)
    force = Parameter("force", default=False)

    start_date, end_date = set_date_params(start_date, end_date)

    site_ids = get_site_ids(client_name)
    for date in pd.date_range(start_date, end_date):
        fetch_raw_data_task = fetch_raw_data.map(
            client_name=unmapped(client_name),
            site_id=site_ids,
            start_date=date,
            end_date=date,
            force=unmapped(force),
        )

        process_raw_data.map(
            client_name=unmapped(client_name),
            site_id=site_ids,
            start_date=date,
            end_date=date,
            force=unmapped(force),
            upstream_tasks=[fetch_raw_data_task],
        )


schedule = Schedule(clocks=[CronClock("55 23 * * * *")])

flow.schedule = schedule
flow.register(project_name=PROJECT_NAME)
The only solution I have thought of is to create another task which combines the lists of site_ids and date_range together such that there is an entry for each combination and then mapping over that list
k
Hey @Eddie Atkinson, you can’t loop like this based on
task
output because these loops are evaluated during build time and the
task
resolves to a value during runtime. Your suggestion is a good one I think. If you need it to be sequential, you can use the
pd.date_range
code and for loop inside your
fetch_raw_data
task instead, but I assume you don’t want to go that route because there is no observability for each
date
,
site_id
combination?
e
Hi again @Kevin Kho, You’re absolutely right about my aim. I want to retain observability over the
date
,
site_id
combination so that my slack logs are as granular as possible (as well as enabling me to re-run data processing jobs for individual days). I will give my solution a crack, if anyone else has any suggestions I would love to hear them
k
I suspect your approach is the right one here because each of those combinations has to be a task
👍 1