Hello all! I'm currently developing some high upda...
# best-practices
Hello all! I'm currently developing some high update frequency pipelines with prefect v1.0 and have stumbled on a question: for each data capture that runs every minute, we log to a BigQuery table if that run was successful or not, this data is used as input for a hourly recapture, that goes and try to request the data for each uncaptured minute in the past 24 hours. The thing is, the way we do it now is by, basically, copying and pasting the code for a single capture and use
, mapping each task in the pipeline to one of the failed timestamps. This generates some issues, as all the data for each timestamp remains in memory during the flow run, exploding our resources limits and taking forever to run each task sequentially. Talking with a coworker, we arrived at a quite elegant solution, which would be to create a capture flow run for each timestamp, distributing the work through multiple pods and cleaning up the code quite nicely. The problem is the task
does not work with
as it's giving me the error:
Copy code
TypeError: Cannot map over unsubscriptable object of type <class 'pipelines.utils.custom.CustomFlow'>
So I'm here to ask, can someone point me to the direction of what would be the best practice to implement this idea? Before this, I've tried looking kinda deep into garbage collection to try and retrieve the memory used for each piece of data, to no avail, as we need to produce code with as low maintenance cost as possible and are looking for robustness where we can. Thanks!
Hi @Caio Rogério Silva dos Santos, you should be able to use
by virtue of
being a task, however it doesn't look like you're using the standard implementation, but rather a class
where I can't offer advice without visibility into that abstraction. It's worth noting here that your problem is way easier to solve in prefect 2.0 where you can just use subflows (asynchronous if you want), doing something like
Copy code
def get_uncaught_minutes() -> List[timestamp]:
   return ["yyyy-MM-dd'T'HH:mm:ss*SSSZZZZ"]

def capture_minute(timestamp: Any):

def capture_flow(timestamp: Any):

def backfill_flow():
   for timestamp in get_uncaught_minutes():
Hi @Nate! Thank you for the answer. This custom flow class just implements a notification on discord in case of failure, it does not overload any of the default methods of the flow class. But since posting, I've been able to get the first step of mapping to work, using the following syntax:
Copy code
            parameters={"timestamp": timestamps},
For the other use cases we have, this works just fine, but in this case, it doesn't. I'm getting a key error:
Copy code
Traceback (most recent call last):
  File "/opt/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/venv/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
  File "/opt/venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 668, in prepare_upstream_states_for_mapping
    value = upstream_state.result[i]
KeyError: 0
Updating to prefect 2.0 is in our radar, but still far from happening. So, could you so kind as to refer me to a functioning use of the
task with
? Thank you!
sure thing @Caio Rogério Silva dos Santos, here is a basic example:
Copy code
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run

with Flow('parent flow') as flow:
    uncaught_minutes = [{'timestamp': i} for i in range(10)]
    child_run_ids = create_flow_run.map(
        parameters=uncaught_minutes, flow_name=unmapped('child flow')

if __name__ == "__main__":
where we're mapping over a
, giving each instance of the child it's own
dictionary, so
could be like:
Copy code
from prefect import Flow, Parameter, task

def capture_minute(timestamp):

with Flow('child flow') as flow:
    timestamp = Parameter('timestamp', default='')
in your example, it was trying to map
over a single dictionary
{"timestamp": timestamps}
when using
, any params that are not
should be an
, where each child should receive one element of that iterable (usually a dict of
in the case of
Thank you so much @Nate, now it's working like a charm! And so much more elegant too. I hope we can migrate to 2.0 soon enough, but for now, you have my gratitude, sir
Great to hear @Caio Rogério Silva dos Santos 🙂 let us know if you need any help along the way!