Kieran
04/06/2021, 11:44 PMKieran
04/06/2021, 11:46 PMwith Flow(
name=NAME,
schedule=CronSchedule("5 */6 * * *"),
storage=default_flow_storage,
run_config=default_flow_run_config,
state_handlers=[slack_handler],
) as flow:
# Extract
# ----------------------------------------------------------------
TMP_TABLE_NAME = generate_tmp_name(NAME) <--- return type str
search_query_total = query_total(CUSTOMER_HEADER) <--- return type int
query_offsets = generate_offsets(
search_query_total,
DEFAULT_ETL_LIMIT_SIZE
) <--- return type list
extracted_data = extract.map(
offset=query_offsets,
query_body=unmapped(CUSTOMER_SEARCH),
limit=unmapped(DEFAULT_ETL_LIMIT_SIZE),
query_header=unmapped(CUSTOMER_HEADER)
) <--- return type list
Kieran
04/06/2021, 11:47 PMextracted_data
task to map through the list of query_offsets
and keep the other variables as unmapped.Kieran
04/06/2021, 11:48 PMis_serializable(flow, True)
is returning false and the following error:
...subprocess.CalledProcessError: Command '/github/flock-prefect/.venv/bin/python /var/folders/p3/ycz3p7ls3c10bg3p2c4_60n00000gn/T/tmpkbedcynv' returned non-zero exit status 1.
Kieran
04/06/2021, 11:49 PMKevin Kho