hi all My flow looks something like this: ```wit...
# ask-community
s
hi all My flow looks something like this:
Copy code
with Flow(FLOW_NAME) as flow:

    event_dates = get_event_dates()

    s3_keys = generate_s3_key.map(suffix=event_dates)
    event_file_data = unload_data_to_s3.map(s3_keys, event_dates)

    update_log = update_log.map(event_dates, upstream_tasks=[event_file_data])

    update_snowflake = update_snowflake.map(s3_keys, event_dates, upstream_tasks=[update_log])
The problem is when I schedule this in cloud (local agent) even though all the mapped tasks complete the run doesn’t seem to terminate. Am I missing something?
a
There are three things which look a bit suspicious: 1. you sometimes pass multiple arguments to map e.g. within unload data and update Snowflake. I think you would need to pass s3_keys as map arguments and the other argument would need to be passed as unmapped (from prefect import unmapped) 2. When you pass data dependencies between tasks, you don't need to set upstream_tasks 3. To actually leverage parallelism, you would need to attach dask executor, e.g. flow.executor = LocalDaskExecutor()
s
1. - yes, this works locally mind - but I think I’ll try something different - maybe combine into a list of tuples? 2. yes but in this case I don’t want to
update_log
and
update_snowflake
- unless the data got unloaded - but
event_file_data
doesn’t return anything. Perhaps I could make that return the tuple, that might do the job.. 3. Yep - will do :)
a
Nice 👍 LMK if you have any open questions regarding this.