DJ Erraballi
12/08/2020, 10:07 PMwith SRMFlow(
'TriggeredPatientLoaderFlow',
feature_flag='patient_upsert_events_enabled'
) as trigger_patient_loader_flow:
client_abbrev: str = Parameter("client_abbrev", default="")
event_source_id: int = Parameter("event_source_id", default=None)
lock_acquired: bool = acquire_event_source_lock(client_abbrev, event_source_id)
linked: bool = link_patient_events(client_abbrev, event_source_id, lock_acquired)
aggregated_patients: List[PersonInputProperties] = aggregate_patients(client_abbrev, linked, event_source_id)
bulk_activity_metadata: dict = get_bulk_activity_metadata(client_abbrev, aggregated_patients)
loaded: bool = load_aggregated_patients(client_abbrev, aggregated_patients, bulk_activity_metadata)
marked_complete: bool = mark_event_source_as_completed(client_abbrev, event_source_id, loaded, lock_acquired)
lock_released: bool = release_event_source_lock(client_abbrev, event_source_id, marked_complete, lock_acquired)
Chris White
Joe Schmid
12/09/2020, 3:20 PM>>> task_ref = flow.get_tasks()[0]
>>> state = flow.run()
>>> state._result.value # a Flow State's Result value is its Tasks' States
{<Task: add>: <Success: "Task run succeeded.">,
<Task: add>: <Success: "Task run succeeded.">}
>>> state.result # the public property aliases the same API as above
{<Task: add>: <Success: "Task run succeeded.">,
<Task: add>: <Success: "Task run succeeded.">}
>>> state.result[task_ref]._result # a Task State's Result contains the Task's return value
<Result: 1>
So the Flow runner process will (by default) keep all task results in memory.
If you wanted to avoid saving task results in memory but still pass data (references) between tasks, you could use something like S3Result configured either at the Flow level (persist task output in S3 for all tasks in the Flow) or the Task level (persist just for this task). Much more here: https://docs.prefect.io/core/advanced_tutorials/using-results.html#using-resultswith SRMFlow(
'TriggeredPatientLoaderFlow',
feature_flag='patient_upsert_events_enabled',
result=S3Result(bucket='your-bucket') # <---- Added this line
) as trigger_patient_loader_flow:
You could leave the rest of the Flow unchanged and memory usage in the Flow runner process should drop (well, fluctuate as tasks load data from S3 into memory and then free memory when done) since the task results would just be references to files in S3.