mithalee mohapatra
12/24/2020, 1:41 AMDylan
mithalee mohapatra
12/28/2020, 6:10 PMKyle Moon-Wright
12/28/2020, 6:46 PMmithalee mohapatra
12/28/2020, 6:49 PMKyle Moon-Wright
12/28/2020, 6:52 PMmithalee mohapatra
12/28/2020, 6:53 PMKyle Moon-Wright
12/28/2020, 7:45 PMexecutor = LocalDaskExecutor(scheduler="threads", num_workers=6)
mithalee mohapatra
12/28/2020, 8:33 PMKyle Moon-Wright
12/28/2020, 8:43 PMwith Flow("Hello") as flow:
bucket = Parameter("bucket", default="test-sandbox")
conn = get_sftp_credentials()
top = top_level_dir(conn, newbatchid="20201221")
second = second_level_dir.map(unmapped(conn), sub_dir=top)
third = third_level_dir.map(flatten(second), unmapped(bucket), unmapped(conn))
executor = LocalDaskExecutor(scheduler="processes", num_workers=6)
if __name__ == "__main__":
# flow.register(project_name="Hello, World_mm!")
flow.run(executor=executor)
LocalDaskExecutor
with the flow.run
method, so I’ll look into this next.mithalee mohapatra
12/28/2020, 8:48 PMKyle Moon-Wright
12/28/2020, 8:49 PMrun_config
to the flow:
from prefect.run_configs import LocalRun
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=6)
flow.run_config = LocalRun()
flow.run()
but this one may only be relevant if using the .executor
method on the flow, possibly irrelevant for us here.mithalee mohapatra
12/28/2020, 8:57 PMKyle Moon-Wright
12/28/2020, 9:17 PMmithalee mohapatra
12/28/2020, 9:19 PMKyle Moon-Wright
12/28/2020, 9:49 PMUsing a DaskExecutor with a local cluster is very similar to using a LocalDaskExecutor with processes=True. You may find it more performant in certain situations (this scheduler does a better job about managing memory), but generally they should perform equivalently for most Prefect workflows.
Since the DaskExecutor is able to execute your flow and allocate resources in an efficient manner with your workload.mithalee mohapatra
12/28/2020, 10:09 PMBen Rosen
12/29/2020, 1:05 AMthird_dir
try lowering that number and see if that helps.