Hi all. Trying to do a backfill on one of my flows...
# ask-community
a
Hi all. Trying to do a backfill on one of my flows. The flow takes in a DateTime parameter and all subsequent tasks use that to crawl/transform data.. so I do something like:
Copy code
for target_date in target_dates:
    f.run(parameters={"target_date": target_date}, run_on_schedule=False)
I am running the above using a
LocalDaskExecutor(scheduler="processes", num_workers=12)
locally. The problem is that run time grows...
Copy code
Took 15.72 sec for 2016-05-08
Took 20.59 sec for 2016-05-09
Took 32.52 sec for 2016-05-10
Took 43.89 sec for 2016-05-11
Took 55.81 sec for 2016-05-12
Took 67.15 sec for 2016-05-13
Took 77.92 sec for 2016-05-14
Took 90.59 sec for 2016-05-15
Took 105.21 sec for 2016-05-16
Took 115.50 sec for 2016-05-17
Would that be a dask issue? Any suggestions for maybe running the backfill locally in a different way welcome! (There is no back aggregation involved, so no reason for later runs to take longer. Volume/size of data is almost identical for each run. So when a backfill completes and I start another one fora different set of dates the time profile is similar: very fast -> slower)
a
Hi @Andreas Tsangarides do you happen to have an example flow I could use to reproduce this issue?
a
Hi @Anna Geller! The crawler uses api_key.... so can we maybe mock up some of the steps? Tried to do my best
a
@Andreas Tsangarides thank you so much for sharing your flow! The first thing I noticed is that you are mapping over a tuple. I think this is not really supported. At least there are some open issues about that. @Kevin Kho please correct me if I’m wrong here. There is this open issue https://github.com/PrefectHQ/prefect/issues/4824 and this one https://github.com/PrefectHQ/prefect/issues/4839
a
I am actually using a NamedTuple (if that makes any difference 😛 )
Copy code
class SettlementPeriod(NamedTuple):
    date: date
    period: int
The flow runs fine. Just slows down if multiple are run in a loop
I configured the flow to map over a list of ints and the behaviour is the same
a
I see. Will look into that. Can you share how do you time it?
a
thanks Anna! 😄
Copy code
for target_date in target_dates:
    start = time.time()
    f.run(parameters={"target_date": target_date}, run_on_schedule=False)
    end = time.time()
    print(f"Took {end-start:.2f} sec for {target_date.strftime('%Y-%m-%d')}")
a
thx, just wanted to cross-check. this makes sense.
k
Now sure what would cause this. The LocalDaskExecutor is just a multiprocessing pool so it doesn’t really have unmanaged memory issues over time. If I have to guess, maybe less and less workers of the executor are being used over time. I don’t have a clue why other than maybe the resources aren’t being freed up after the runs. I think this might be better when you run through agent though instead of
flow.run()
.
upvote 1
a
hmmmmm...warning: this is an ugly hack... indeed must be something with multiprocessing so in the for date in dates... loop instead of
f.run(...)
if i do
Copy code
subprocess.Popen("<shell command for executing>")
the run time remains low.....
k
I think what’s happening is that the previous executors are not cleared up when the next ones are created. I think you can restructure this to all be 1 flow run and it might be more efficient. Take in the date and the range of items you want to process and then map over them with the LocalDaskExecutor? Or do you really need sequential execution by date?
upvote 2
a
@Andreas Tsangarides if you want to test this flow nicely with the backend, you can create multiple flow runs with different parameters this way: • a silly flow that should correspond to your actual flow that you want to test timing for various parameters
Copy code
from prefect import Flow, Parameter, task


@task(log_stdout=True)
def hello_world(user_input: str):
    print(f"hello {user_input}!")


with Flow("dummy-child-flow") as flow:
    param = Parameter("user_input", default="world")
    hw = hello_world(param)
• parent flow to trigger flow runs
Copy code
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect"),
                  dict(user_input="Marvin"),
                  dict(user_input="World"),
                  ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("community"),
    )
🙌 1
a
Thanks both! will give it a shot! out of curiosity, is there a way to create a flow of flows using prefect core only (without relying on registered flows with a server)?
a
no, unfortunately the child flow must be registered to the backend.