tom
10/08/2020, 5:42 PMdef fetch_object_ids():
# Long running task with many object_ids returning.
cursor = None
while True:
# Keep fetching and yielding object IDs until we're exhausted.
obj_ids, cursor = external_request_to_fetch_object_ids(cursor)
for obj_id in obj_ids:
yield obj_id
if cursor is None:
break
def process_object(object_id):
do_something_with_object(object_id)
with Flow("Process Objects" as flow):
object_ids = fetch_object_ids() # I don't want to wait here / keep this list in memory
process_object.map(object_ids)
send_finished_email_to_user() # Should depend on all objects processed.
Dylan
tom
10/08/2020, 7:07 PMDylan
tom
10/08/2020, 7:08 PMDylan
tom
10/08/2020, 7:09 PMDylan
@task(
name="Get New Record Counts",
checkpoint=True,
max_retries=5,
retry_delay=timedelta(minutes=3),
tags=["prefect_cloud_database"],
)
def get_new_record_counts(table_data, batch_size, batch_count, pg_connection_string):
logger = prefect.context.get("logger")
# extract table name & last updated timestamp
document_id, table = table_data
table["document_id"] = document_id
if not table["last_updated"]:
table["last_updated"] = pendulum.now().subtract(years=20).to_datetime_string()
time_sort_field = table_time_sort_field(table["table"])
# get count of new records
result = pd.read_sql(
con=pg_connection_string,
sql=f"""
SELECT COUNT(*)
FROM {table["schema"]}."{table["table"]}"
WHERE {time_sort_field} > '{table["last_updated"]}';
""",
)
count = result["count"][0]
<http://logger.info|logger.info>(f"{count} new records")
# TODO take batch_count into account
offsets = list(range(0, count, batch_size))
table_offsets = []
for offset in offsets:
table_offset = table.copy()
table_offset["offset"] = offset
table_offsets.append(table_offset)
<http://logger.info|logger.info>(table_offsets)
# table_offsets = [{ doucument_id: "", table: "", schema: "", last_updated: "timestamp", offset: 0 }]
return table_offsets
tom
10/08/2020, 7:15 PM