Hello all! I'm currently developing some high upda...
# best-practices
c
Hello all! I'm currently developing some high update frequency pipelines with prefect v1.0 and have stumbled on a question: for each data capture that runs every minute, we log to a BigQuery table if that run was successful or not, this data is used as input for a hourly recapture, that goes and try to request the data for each uncaptured minute in the past 24 hours. The thing is, the way we do it now is by, basically, copying and pasting the code for a single capture and use
.map
, mapping each task in the pipeline to one of the failed timestamps. This generates some issues, as all the data for each timestamp remains in memory during the flow run, exploding our resources limits and taking forever to run each task sequentially. Talking with a coworker, we arrived at a quite elegant solution, which would be to create a capture flow run for each timestamp, distributing the work through multiple pods and cleaning up the code quite nicely. The problem is the task
create_flow_run
does not work with
.map
as it's giving me the error:
Copy code
TypeError: Cannot map over unsubscriptable object of type <class 'pipelines.utils.custom.CustomFlow'>
So I'm here to ask, can someone point me to the direction of what would be the best practice to implement this idea? Before this, I've tried looking kinda deep into garbage collection to try and retrieve the memory used for each piece of data, to no avail, as we need to produce code with as low maintenance cost as possible and are looking for robustness where we can. Thanks!
n
Hi @Caio Rogério Silva dos Santos, you should be able to use
.map
on
create_flow_run
by virtue of
create_flow_run
being a task, however it doesn't look like you're using the standard implementation, but rather a class
'pipelines.utils.custom.CustomFlow'
where I can't offer advice without visibility into that abstraction. It's worth noting here that your problem is way easier to solve in prefect 2.0 where you can just use subflows (asynchronous if you want), doing something like
Copy code
@task
def get_uncaught_minutes() -> List[timestamp]:
   return ["yyyy-MM-dd'T'HH:mm:ss*SSSZZZZ"]

@task
def capture_minute(timestamp: Any):
   #something
   pass

@flow
def capture_flow(timestamp: Any):
   capture_minute(timestamp)

@flow
def backfill_flow():
   for timestamp in get_uncaught_minutes():
      capture_flow(timestamp)
c
Hi @Nate! Thank you for the answer. This custom flow class just implements a notification on discord in case of failure, it does not overload any of the default methods of the flow class. But since posting, I've been able to get the first step of mapping to work, using the following syntax:
Copy code
create_flow_run.map(
            unmapped(flow),
            project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value),
            labels=unmapped(LABELS),
            run_name=unmapped(captura_sppo_v2.name),
            parameters={"timestamp": timestamps},
        )
For the other use cases we have, this works just fine, but in this case, it doesn't. I'm getting a key error:
Copy code
Traceback (most recent call last):
  File "/opt/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/venv/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
    executors.prepare_upstream_states_for_mapping(
  File "/opt/venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 668, in prepare_upstream_states_for_mapping
    value = upstream_state.result[i]
KeyError: 0
Updating to prefect 2.0 is in our radar, but still far from happening. So, could you so kind as to refer me to a functioning use of the
create_flow_run
task with
.map
? Thank you!
n
sure thing @Caio Rogério Silva dos Santos, here is a basic example:
parent_flow.py
Copy code
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run


with Flow('parent flow') as flow:
    
    uncaught_minutes = [{'timestamp': i} for i in range(10)]
    
    child_run_ids = create_flow_run.map(
        parameters=uncaught_minutes, flow_name=unmapped('child flow')
    )

if __name__ == "__main__":
    flow.run(run_on_schedule=False)
where we're mapping over a
List[dict]
, giving each instance of the child it's own
parameters
dictionary, so
child_flow.py
could be like:
Copy code
from prefect import Flow, Parameter, task

@task
def capture_minute(timestamp):
    pass

with Flow('child flow') as flow:
    timestamp = Parameter('timestamp', default='')
    
    capture_minute(timestamp)
in your example, it was trying to map
create_flow_run
over a single dictionary
{"timestamp": timestamps}
when using
Task.map()
, any params that are not
unmapped
should be an
Iterable
, where each child should receive one element of that iterable (usually a dict of
parameters
in the case of
create_flow_run
)
c
Thank you so much @Nate, now it's working like a charm! And so much more elegant too. I hope we can migrate to 2.0 soon enough, but for now, you have my gratitude, sir
n
Great to hear @Caio Rogério Silva dos Santos 🙂 let us know if you need any help along the way!