Tom Klein
08/23/2022, 2:11 PMsnow_account = PrefectSecret('SNOWFLAKE_ACCOUNT')
snow_user = PrefectSecret('SNOWFLAKE_USER')
snow_pass = PrefectSecret('SNOWFLAKE_PASSWORD')
snow_role = PrefectSecret('SNOWFLAKE_ROLE')
snow_warehouse = Parameter("SNOWFLAKE_WAREHOUSE", default="SERVICE")
query = get_fetch_accounts_query()
accounts_data = snowflake_query(
account=snow_account,
user=snow_user,
password=snow_pass,
role=snow_role,
warehouse=snow_warehouse,
query=query
)
saved = save_as_csv(MAIN_INPUT_FILE_PATH, accounts_data)
historic_vertical = Parameter("HISTORIC_VERTICAL")
variant = Parameter("VARIANT")
chunks = split_csv(file=saved, rowsize=1000)
run_outputs = wrapped_predict_shell_task.map(chunks, unmapped(historic_vertical), unmapped(variant))
all we do here is :
• pull some data from snowflake
• save it as a CSV
• split it into many smaller ones
• give the smaller CSV array to a (wrapped) shell task function to map over
we run with LocalDaskExecutor and 4 workersprocesses
(simply the default for LocalDaskExecutor
)
• the wrapped shell-task doesn’t actually return anything
• the shell task itself just internally stores some processed results in S3, without returning anything