Maity
08/07/2023, 1:09 AM# create an api request for each self link
api_results = api_get.map(map(lambda x: x["self"], employee_self_links))
@task(
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=10),
)
def write(employee: Response):
employee = employee.json()
dest = f"raw/employees/details/{employee['id']}.json"
write_to_file_system.fn(dest, employee)
# save the results to storage
storage_results = write.map(api_results)
from prefect import unmapped
# Function to split a list into batches
def batch(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
# Assuming employee_self_links is available
employee_links = list(map(lambda x: x["self"], employee_self_links))
api_results = []
# Process the batches of 10 links
for batch_links in batch(employee_links, 10):
batch_results = api_get.map(batch_links)
api_results.extend(batch_results)
@task(
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=10),
)
def write(employee: Response):
employee = employee.json()
dest = f"raw/employees/details/{employee['id']}.json"
write_to_file_system.fn(dest, employee)
# Save the results to storage
storage_results = write.map(api_results)
Matt Klein
08/16/2023, 3:55 PMMaity
08/25/2023, 4:21 AM@flow
def ingest_cost_center():
# get all the cost centers
cost_centers_listings = ingest_cost_center_listing()
# extract the IDs from each cost center
cost_center_ids = [
cc["id"]
for cc_response in cost_centers_listings
for cc in cc_response["cost_centers"]
]
# create the API paths for each cost center request
cost_center_paths = [
f"v2/companies/|{API_COMPANY}/config/cost-centers/{id}"
for id in cost_center_ids
]
results = []
batch_size = 10
# perform the remainder of tasks in batches
while len(cost_center_paths) > 0:
batch_cost_center_paths = cost_center_paths[:batch_size]
cost_center_paths = cost_center_paths[batch_size:]
# for each API path, execute the API
cost_centers_responses = api_get.map(batch_cost_center_paths)
writes = write_to_file_system.map(
"raw/cost_centers/details/{id}.json",
cost_centers_responses,
# return_state to force await until the batch is done before doing the next batch
return_state=True,
)
results += writes
return results