e

    Eddie Atkinson

    1 year ago
    Hey folks, Haven’t been able to figure this one out by reading the docs / threads here. I am trying to process data for sites in a date range one day at a time (with a task running for each combination of date and site_id). My current solution uses
    map
    to process the data for each site individually, but does so for the entire range at once. The complication arises from the fact that the date range is specified as a parameter, and the site_ids are the result of a task that runs.
    This is the code as it currently is:
    with Flow(
        FLOW_NAME,
        run_config=RUN_CONFIG,
        storage=STORAGE,
        state_handlers=[failure_handler, success_handler],
    ) as flow:
        client_name = Parameter("client_name", default=DEFAULT_CLIENT_NAME)
        start_date = Parameter("start_date", default=None)
        end_date = Parameter("end_date", default=None)
        force = Parameter("force", default=False)
    
        start_date, end_date = set_date_params(start_date, end_date)
    
        site_ids = get_site_ids(client_name)
    
        fetch_raw_data_task = fetch_raw_data.map(
            client_name=unmapped(client_name),
            site_id=site_ids,
            start_date=start_date,
            end_date=end_date,
            force=unmapped(force),
        )
    
        process_raw_data.map(
            client_name=unmapped(client_name),
            site_id=site_ids,
            start_date=start_date,
            end_date=end_date,
            force=unmapped(force),
            upstream_tasks=[fetch_raw_data_task],
        )
    
    
    schedule = Schedule(clocks=[CronClock("55 23 * * * *")])
    
    flow.schedule = schedule
    flow.register(project_name=PROJECT_NAME)
    Hopefully this code demonstrates the logic I want to implement (though I am aware that it won’t work as written)
    with Flow(
        FLOW_NAME,
        run_config=RUN_CONFIG,
        storage=STORAGE,
        state_handlers=[failure_handler, success_handler],
    ) as flow:
        client_name = Parameter("client_name", default=DEFAULT_CLIENT_NAME)
        start_date = Parameter("start_date", default=None)
        end_date = Parameter("end_date", default=None)
        force = Parameter("force", default=False)
    
        start_date, end_date = set_date_params(start_date, end_date)
    
        site_ids = get_site_ids(client_name)
        for date in pd.date_range(start_date, end_date):
            fetch_raw_data_task = fetch_raw_data.map(
                client_name=unmapped(client_name),
                site_id=site_ids,
                start_date=date,
                end_date=date,
                force=unmapped(force),
            )
    
            process_raw_data.map(
                client_name=unmapped(client_name),
                site_id=site_ids,
                start_date=date,
                end_date=date,
                force=unmapped(force),
                upstream_tasks=[fetch_raw_data_task],
            )
    
    
    schedule = Schedule(clocks=[CronClock("55 23 * * * *")])
    
    flow.schedule = schedule
    flow.register(project_name=PROJECT_NAME)
    The only solution I have thought of is to create another task which combines the lists of site_ids and date_range together such that there is an entry for each combination and then mapping over that list
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Eddie Atkinson, you can’t loop like this based on
    task
    output because these loops are evaluated during build time and the
    task
    resolves to a value during runtime. Your suggestion is a good one I think. If you need it to be sequential, you can use the
    pd.date_range
    code and for loop inside your
    fetch_raw_data
    task instead, but I assume you don’t want to go that route because there is no observability for each
    date
    ,
    site_id
    combination?
    e

    Eddie Atkinson

    1 year ago
    Hi again @Kevin Kho, You’re absolutely right about my aim. I want to retain observability over the
    date
    ,
    site_id
    combination so that my slack logs are as granular as possible (as well as enabling me to re-run data processing jobs for individual days). I will give my solution a crack, if anyone else has any suggestions I would love to hear them
    Kevin Kho

    Kevin Kho

    1 year ago
    I suspect your approach is the right one here because each of those combinations has to be a task