Andreas Tsangarides
10/21/2021, 10:32 AMfor target_date in target_dates:
f.run(parameters={"target_date": target_date}, run_on_schedule=False)
I am running the above using a LocalDaskExecutor(scheduler="processes", num_workers=12)
locally. The problem is that run time grows...
Took 15.72 sec for 2016-05-08
Took 20.59 sec for 2016-05-09
Took 32.52 sec for 2016-05-10
Took 43.89 sec for 2016-05-11
Took 55.81 sec for 2016-05-12
Took 67.15 sec for 2016-05-13
Took 77.92 sec for 2016-05-14
Took 90.59 sec for 2016-05-15
Took 105.21 sec for 2016-05-16
Took 115.50 sec for 2016-05-17
Would that be a dask issue? Any suggestions for maybe running the backfill locally in a different way welcome!
(There is no back aggregation involved, so no reason for later runs to take longer. Volume/size of data is almost identical for each run. So when a backfill completes and I start another one fora different set of dates the time profile is similar: very fast -> slower)Anna Geller
Andreas Tsangarides
10/21/2021, 11:31 AMAnna Geller
Andreas Tsangarides
10/21/2021, 1:05 PMclass SettlementPeriod(NamedTuple):
date: date
period: int
The flow runs fine. Just slows down if multiple are run in a loopAndreas Tsangarides
10/21/2021, 1:25 PMAnna Geller
Andreas Tsangarides
10/21/2021, 1:41 PMfor target_date in target_dates:
start = time.time()
f.run(parameters={"target_date": target_date}, run_on_schedule=False)
end = time.time()
print(f"Took {end-start:.2f} sec for {target_date.strftime('%Y-%m-%d')}")
Anna Geller
Kevin Kho
flow.run()
.Andreas Tsangarides
10/21/2021, 2:58 PMf.run(...)
if i do
subprocess.Popen("<shell command for executing>")
the run time remains low.....Kevin Kho
Anna Geller
from prefect import Flow, Parameter, task
@task(log_stdout=True)
def hello_world(user_input: str):
print(f"hello {user_input}!")
with Flow("dummy-child-flow") as flow:
param = Parameter("user_input", default="world")
hw = hello_world(param)
• parent flow to trigger flow runs
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
parameters = [dict(user_input="Prefect"),
dict(user_input="Marvin"),
dict(user_input="World"),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-child-flow"),
project_name=unmapped("community"),
)
Andreas Tsangarides
10/21/2021, 3:58 PMAnna Geller