Nathan Low
10/20/2025, 1:56 PMprint(accounts_date_df.to_string())
geneva_data_dfs: ASPrefectGenevaUtil_dataclasses.GenevaExtractData = (
geneva_extract_tasks.extract_geneva_data_with_update_data_and_file.map(
update_row=accounts_date_df.itertuples(),
geneva_tax_lot_args=unmapped(geneva_args),
type_of_pull=type_of_pull,
)
)
geneva_data_tranform: pd.DataFrame = (
geneva_transform_tasks.transform_geneva_extract_data.map(
geneva_data_dfs, type_of_pull=type_of_pull
)
)
geneva_load_tasks.load_geneva_extract_data.map(
geneva_data_tranform, type_of_pull=type_of_pull
)Nate
10/20/2025, 2:10 PMThis seems to bug out and request I start using .wait() in 3.the task semantics should not have changed here, and broadly executing tasks is faster in 3.x the only difference is that you must resolve terminal futures that you're not explicitly `return`ing
Nathan Low
10/20/2025, 2:11 PMNathan Low
10/20/2025, 2:12 PMNathan Low
10/20/2025, 2:21 PMNathan Low
10/20/2025, 2:21 PMNate
10/20/2025, 2:27 PMthe only difference is that you must resolve terminal futures that you're not explicitly `return`ingyou can read more about this here or watch where i talk about this exact thing if you have a minimal piece of code that reproduces this warning in a way that's unexpected, feel free to share it
Nathan Low
10/20/2025, 2:40 PMNathan Low
10:38 AM (0 minutes ago)
to me
# flow
@flow(
name="Geneva Extract PreBacktest Daily Loader",
description="The loads the specified type of data to the table needed before the backtest daily.",
version="0.1",
log_prints=True,
flow_run_name=get_pre_backtest_flow_name,
)
def extract_geneva_data_pre_backtest_run(
safe_time_start: str = "", # time is not json serializable.
safe_time_end: str = "",
type_of_pull: str = "",
accounts_date_df: Any = None, # pd.DataFrame = None,
geneva_args: dict = {},
):
logger = get_run_logger()
(
safe_time_start_datetime,
safe_time_end_datetime,
) = geneva_extract_tasks.generate_safe_times(
safe_time_start=safe_time_start, safe_time_end=safe_time_end
)
run_sub_flow = geneva_extract_tasks.check_for_safe_time(
safe_time_start_datetime=safe_time_start_datetime,
safe_time_end_datetime=safe_time_end_datetime,
)
if run_sub_flow:
if accounts_date_df is None:
accounts_date_df = (
ASPrefectGenevaUtilSQLPulls.get_t_minus_x_account_details(x=1)
)
else:
logger.info("Flow is running outside of the safe times to run. Aborting.")
if run_sub_flow:
if geneva_args is None:
geneva_args = geneva_extract_tasks.generate_geneva_fixed_args()
print("Here are the accounts and dates to be loaded")
print(accounts_date_df.to_string())
geneva_data_dfs: ASPrefectGenevaUtil_dataclasses.GenevaExtractData = (
geneva_extract_tasks.extract_geneva_data_with_update_data_and_file.map(
update_row=accounts_date_df.itertuples(),
geneva_tax_lot_args=unmapped(geneva_args),
type_of_pull=type_of_pull,
)
)
geneva_data_tranform: pd.DataFrame = (
geneva_transform_tasks.transform_geneva_extract_data.map(
geneva_data_dfs, type_of_pull=type_of_pull
)
)
# load_complete =
loaded_data_done = geneva_load_tasks.load_geneva_extract_data.map(
geneva_data_tranform, type_of_pull=type_of_pull
)
else:
print("Cannot run as it is outside of the safe time.")
print("Finished Processing Dates and Funds requested.")Nathan Low
10/20/2025, 2:44 PMNathan Low
10/20/2025, 2:44 PMNate
10/20/2025, 2:47 PMreturn must be resolved via .wait() or .result()
for example, loaded_data_done at the bottom. loaded_data_done contains a list of future objects. you'd need to chain .wait() or .result() on the end of that, or pass loaded_data_done into the wait() utility or else when the scope of the flow exits, the futures will be garbage collected
if you have a minimal piece of code
seems pretty similar, unsure what I'm doing wrongideally minimal means one that's E2E can be run as is / not coupled to what your geneva use case is, which will make it easier to understand the structural issue with either your code or the library
Nathan Low
10/20/2025, 2:52 PMif geneva_args is None:
geneva_args = geneva_extract_tasks.generate_geneva_fixed_args()
would that cause Prefect to get confused with its mapping? should I make geneva_args a dict by using
geneva_args = geneva_extract_tasks.generate_geneva_fixed_args.submit().result
? Would that work?Nate
10/20/2025, 2:54 PMsome_task.submit().result
would refer to the bound result method on a PrefectFuture, whereas
some_task.submit().result()
would refer to the result associated with that futureNathan Low
10/20/2025, 2:59 PM