Caio Rogério Silva dos Santos
08/16/2022, 7:23 PM.map
, mapping each task in the pipeline to one of the failed timestamps. This generates some issues, as all the data for each timestamp remains in memory during the flow run, exploding our resources limits and taking forever to run each task sequentially.
Talking with a coworker, we arrived at a quite elegant solution, which would be to create a capture flow run for each timestamp, distributing the work through multiple pods and cleaning up the code quite nicely. The problem is the task create_flow_run
does not work with .map
as it's giving me the error:
TypeError: Cannot map over unsubscriptable object of type <class 'pipelines.utils.custom.CustomFlow'>
So I'm here to ask, can someone point me to the direction of what would be the best practice to implement this idea? Before this, I've tried looking kinda deep into garbage collection to try and retrieve the memory used for each piece of data, to no avail, as we need to produce code with as low maintenance cost as possible and are looking for robustness where we can.
Thanks!Nate
08/16/2022, 8:55 PM.map
on create_flow_run
by virtue of create_flow_run
being a task, however it doesn't look like you're using the standard implementation, but rather a class 'pipelines.utils.custom.CustomFlow'
where I can't offer advice without visibility into that abstraction.
It's worth noting here that your problem is way easier to solve in prefect 2.0 where you can just use subflows (asynchronous if you want), doing something like
@task
def get_uncaught_minutes() -> List[timestamp]:
return ["yyyy-MM-dd'T'HH:mm:ss*SSSZZZZ"]
@task
def capture_minute(timestamp: Any):
#something
pass
@flow
def capture_flow(timestamp: Any):
capture_minute(timestamp)
@flow
def backfill_flow():
for timestamp in get_uncaught_minutes():
capture_flow(timestamp)
Caio Rogério Silva dos Santos
08/16/2022, 10:11 PMcreate_flow_run.map(
unmapped(flow),
project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value),
labels=unmapped(LABELS),
run_name=unmapped(captura_sppo_v2.name),
parameters={"timestamp": timestamps},
)
For the other use cases we have, this works just fine, but in this case, it doesn't. I'm getting a key error:
Traceback (most recent call last):
File "/opt/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/opt/venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 668, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
create_flow_run
task with .map
? Thank you!Nate
08/16/2022, 11:12 PMparent_flow.py
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
with Flow('parent flow') as flow:
uncaught_minutes = [{'timestamp': i} for i in range(10)]
child_run_ids = create_flow_run.map(
parameters=uncaught_minutes, flow_name=unmapped('child flow')
)
if __name__ == "__main__":
flow.run(run_on_schedule=False)
where we're mapping over a List[dict]
, giving each instance of the child it's own parameters
dictionary, so child_flow.py
could be like:
from prefect import Flow, Parameter, task
@task
def capture_minute(timestamp):
pass
with Flow('child flow') as flow:
timestamp = Parameter('timestamp', default='')
capture_minute(timestamp)
create_flow_run
over a single dictionary {"timestamp": timestamps}
when using Task.map()
, any params that are not unmapped
should be an Iterable
, where each child should receive one element of that iterable (usually a dict of parameters
in the case of create_flow_run
)Caio Rogério Silva dos Santos
08/17/2022, 6:45 PMNate
08/17/2022, 6:48 PM