Hi All, I used to be able to run .map() across mul...
# ask-community
n
Hi All, I used to be able to run .map() across multiple tasks in prefect 2. This seems to bug out and request I start using .wait() in 3. Unfortunately this slows down the job significantly. Is there any easy way to do this that isn't nested mapped tasks?
Copy code
print(accounts_date_df.to_string()) 
   geneva_data_dfs: ASPrefectGenevaUtil_dataclasses.GenevaExtractData = (
      geneva_extract_tasks.extract_geneva_data_with_update_data_and_file.map(
        update_row=accounts_date_df.itertuples(),
        geneva_tax_lot_args=unmapped(geneva_args),
        type_of_pull=type_of_pull,
      )
    )
    geneva_data_tranform: pd.DataFrame = (
      geneva_transform_tasks.transform_geneva_extract_data.map(
        geneva_data_dfs, type_of_pull=type_of_pull
      )
    )
geneva_load_tasks.load_geneva_extract_data.map(
      geneva_data_tranform, type_of_pull=type_of_pull
    )
n
hi @Nathan Low - can you explain what you mean by this?
This seems to bug out and request I start using .wait() in 3.
the task semantics should not have changed here, and broadly executing tasks is faster in 3.x the only difference is that you must resolve terminal futures that you're not explicitly `return`ing
n
So I'll get an error and the prefect exception will request I use .wait(). Give me a sec and I'll post the error message here. Have to email myself to get past the company firewall
I was thinking maybe I could try wait_for but that seems to cause crashes in some flows, haven't pinpointed a reason for that yet.
So here's the error message I get: Task run failed with exception: TaskRunTimeoutError('Scope timed out after 1800.0 second(s).') - Retry 1/2 will start 30 second(s) from now And the request from prefect: I'm on version 3.4.19 if that helps
It doesn't wait 30 minutes to raise the error, seems to error out after 1 minute
n
yea i suspect somewhere terminal futures are not being resolved
the only difference is that you must resolve terminal futures that you're not explicitly `return`ing
you can read more about this here or watch

this

where i talk about this exact thing if you have a minimal piece of code that reproduces this warning in a way that's unexpected, feel free to share it
n
Thanks for looking, here's the full flow that makes the error. I'm not doing any submit() calls before the first map, so I think all futures should be collected right?
Copy code
Nathan Low
10:38 AM (0 minutes ago)	
to me
# flow
@flow(
    name="Geneva Extract PreBacktest Daily Loader",
    description="The loads the specified type of data to the table needed before the backtest daily.",
    version="0.1",
    log_prints=True,
    flow_run_name=get_pre_backtest_flow_name,
)

def extract_geneva_data_pre_backtest_run(
    safe_time_start: str = "",  # time is not json serializable.
    safe_time_end: str = "",
    type_of_pull: str = "",
    accounts_date_df: Any = None,  # pd.DataFrame = None,
    geneva_args: dict = {},
):
    logger = get_run_logger()
    (
        safe_time_start_datetime,
        safe_time_end_datetime,
    ) = geneva_extract_tasks.generate_safe_times(
        safe_time_start=safe_time_start, safe_time_end=safe_time_end
    ) 

    run_sub_flow = geneva_extract_tasks.check_for_safe_time(
        safe_time_start_datetime=safe_time_start_datetime,
        safe_time_end_datetime=safe_time_end_datetime,
    )
    if run_sub_flow:
        if accounts_date_df is None:
            accounts_date_df = (
                ASPrefectGenevaUtilSQLPulls.get_t_minus_x_account_details(x=1)
            )
    else:
        logger.info("Flow is running outside of the safe times to run. Aborting.")
    if run_sub_flow:
        if geneva_args is None:
            geneva_args = geneva_extract_tasks.generate_geneva_fixed_args()
        print("Here are the accounts and dates to be loaded")
        print(accounts_date_df.to_string())
        geneva_data_dfs: ASPrefectGenevaUtil_dataclasses.GenevaExtractData = (
            geneva_extract_tasks.extract_geneva_data_with_update_data_and_file.map(
                update_row=accounts_date_df.itertuples(),
                geneva_tax_lot_args=unmapped(geneva_args),
                type_of_pull=type_of_pull,
            )
        )
        geneva_data_tranform: pd.DataFrame = (
            geneva_transform_tasks.transform_geneva_extract_data.map(
                geneva_data_dfs, type_of_pull=type_of_pull
            )
        )
# load_complete =
        loaded_data_done = geneva_load_tasks.load_geneva_extract_data.map(
            geneva_data_tranform, type_of_pull=type_of_pull
        )
    else:
        print("Cannot run as it is outside of the safe time.")
    print("Finished Processing Dates and Funds requested.")
Seems like it's failing on geneva_extract_tasks.extract_geneva_data_with_update_data_and_file.map
Your video example with the astronauts seems pretty similar, unsure what I'm doing wrong
n
if you're passing futures into another task, they will automatically be resolved for you just like 2.x. the only change is that terminal futures that you do not
return
must be resolved via
.wait()
or
.result()
for example,
loaded_data_done
at the bottom.
loaded_data_done
contains a list of future objects. you'd need to chain
.wait()
or
.result()
on the end of that, or pass
loaded_data_done
into the
wait()
utility or else when the scope of the flow exits, the futures will be garbage collected
if you have a minimal piece of code
seems pretty similar, unsure what I'm doing wrong
ideally minimal means one that's E2E can be run as is / not coupled to what your geneva use case is, which will make it easier to understand the structural issue with either your code or the library
n
Gotcha, going to take a while to find an mre that causes this same error. If "geneva_extract_tasks.extract_geneva_data_with_update_data_and_file.map" is waiting for geneva_args, and geneva args could be either a dict or a PrefectFuture if I'm using "
Copy code
if geneva_args is None:
            geneva_args = geneva_extract_tasks.generate_geneva_fixed_args()
would that cause Prefect to get confused with its mapping? should I make geneva_args a dict by using
Copy code
geneva_args = geneva_extract_tasks.generate_geneva_fixed_args.submit().result
? Would that work?
n
without seeing the definitions of these tasks its hard to say (hence suggesting an MRE) but in general this
Copy code
some_task.submit().result
would refer to the bound
result
method on a
PrefectFuture
, whereas
Copy code
some_task.submit().result()
would refer to the result associated with that future
n
Thanks, looks like the load_data_done without the wait at the end was the cause. Putting the wait at the end seemed to make the chain happy. Thanks!
catjam 2