Thread
#prefect-community
    Stephen Herron

    Stephen Herron

    8 months ago
    hi all My flow looks something like this:
    with Flow(FLOW_NAME) as flow:
    
        event_dates = get_event_dates()
    
        s3_keys = generate_s3_key.map(suffix=event_dates)
        event_file_data = unload_data_to_s3.map(s3_keys, event_dates)
    
        update_log = update_log.map(event_dates, upstream_tasks=[event_file_data])
    
        update_snowflake = update_snowflake.map(s3_keys, event_dates, upstream_tasks=[update_log])
    The problem is when I schedule this in cloud (local agent) even though all the mapped tasks complete the run doesn’t seem to terminate. Am I missing something?
    Anna Geller

    Anna Geller

    8 months ago
    There are three things which look a bit suspicious: 1. you sometimes pass multiple arguments to map e.g. within unload data and update Snowflake. I think you would need to pass s3_keys as map arguments and the other argument would need to be passed as unmapped (from prefect import unmapped) 2. When you pass data dependencies between tasks, you don't need to set upstream_tasks 3. To actually leverage parallelism, you would need to attach dask executor, e.g. flow.executor = LocalDaskExecutor()
    Stephen Herron

    Stephen Herron

    8 months ago
    1. - yes, this works locally mind - but I think I’ll try something different - maybe combine into a list of tuples? 2. yes but in this case I don’t want to
    update_log
    and
    update_snowflake
    - unless the data got unloaded - but
    event_file_data
    doesn’t return anything. Perhaps I could make that return the tuple, that might do the job.. 3. Yep - will do 😃
    Anna Geller

    Anna Geller

    8 months ago
    Nice 👍 LMK if you have any open questions regarding this.