Scott Aefsky
06/23/2022, 5:27 PMcapture_ID_batch, all_capture_metadata = get_unprocessed_capture_ID_batches(snowflake_tbl, experiment_id=10, batch_size = 500)
capture_loc_map = get_locations_for_captures(all_capture_metadata, s3_bucket)
#read the files from local disk given IDs, process and record proc jsons
process_batch.map(capture_ID_batch, unmapped(snowflake_tbl), unmapped(capture_loc_map), unmapped(intrazone_extractor), unmapped(dynamic_zone_finder))
The Prefect schematic shows that my first task `get_unprocessed_capture_ID_batches`is getting duplicated, and my flow is eventually failing because of a Dask scheduler timeout.
The last task starts running, but won't complete because the timeout kills it. I'm trying to understand a couple of things:
• Why is my first task showing up as if it's a mapped task?
• Why is the base node of that task not completing if each of the 2 instances of it are complete?
• Why is the scheduler timing out? If I run this flow with a small amount of data, it runs fine, but with a larger dataset I get this failure.
Thanks for any help you can provide!Kevin Kho
06/23/2022, 5:33 PMScott Aefsky
06/23/2022, 5:35 PMbase_run_config = ECSRun(
image=ECR_IMAGE,
execution_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-execution-role',
task_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-task-role',
run_task_kwargs={
"cluster": "PrefectCluster",
},
labels=[account],
cpu=2048,
memory=16384
)
def dynamic_executor():
if prefect.context.parameters["use_fargate"] == 'true':
return FargateCluster(
image= ECR_IMAGE,
n_workers= 5, # Must specify n_workers
cluster_arn= fr"arn:aws:ecs:us-east-1:{account}:cluster/PrefectCluster",
task_role_arn= fr"arn:aws:iam::{account}:role/prefect-ecs-task-role",
vpc= vpc,
subnets= [subnet],
security_groups= [sg],
cloudwatch_logs_group=f'prefect_ecs_task_log_{account}'
)
else:
return LocalCluster()
base_executor = DaskExecutor(
cluster_class=dynamic_executor
)
Kevin Kho
06/23/2022, 5:45 PMScott Aefsky
06/24/2022, 1:54 PMdistributed.comm.core.CommClosedError: Comm <TCP (closed) Worker->Scheduler local=<tcp://10.0.94.23:58076> remote=<tcp://10.0.78.242:8786>> already closed.
2022-06-24T09:41:19.616-04:00 distributed.nanny - INFO - Worker closed
2022-06-24T09:41:20.397-04:00 distributed.nanny - INFO - Closing Nanny at '<tcp://10.0.94.23:46079>'
2022-06-24T09:41:20.397-04:00 distributed.dask_worker - INFO - End worker
4. If I click into the first task, I see the attached screenshot, so I think it is really a mapped task.Kevin Kho
06/24/2022, 2:23 PMfrom prefect import Flow, task
@task(nout=2)
def abc():
return 1, 2
@task
def bcd():
return 1
@task
def cde():
return 1
with Flow("..") as flow:
x, y = abc()
bcd(upstream_tasks=[x])
cde(upstream_tasks=[y])
flow.register("databricks")
So it appears anything with nout
gets renders as mapped. Definitely not ideal, but honestly unlikely to change with the focus on 2.0from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True, n_workers=4)
For local cluster, the default is processes. For FargateCluster, it’s something convoluted like:
FargateCluster(n_workers=4, threads_per_worker=1)
But I would rank this as lower priority than sorting out the worker crashesFargateCluster
. It can take scheduler_mem
and scheduler_cpu
as cluster kwargsScott Aefsky
06/24/2022, 2:34 PMnout=2
task is getting rendered twice, as long as it isn't actually getting run twice.
I will add some resources to the scheduler vis the FargateCluster kwargs to see if that helps.
For the workers point, the weird thing is I get the error message of the scheduler timeout on the flow
[24 June 2022 9:40am]: Unexpected error: OSError('Timed out trying to connect to <tcp://3.234.183.8:8786> after 30 s')
but the workers still run for a couple more minutes. The workers seem like they're running over the full set of mapped data they're supposed to process, but can't report their status back to the scheduler. So the next set of jobs in the map can't get started. Does that make sense?Kevin Kho
06/24/2022, 2:38 PMScott Aefsky
06/24/2022, 2:43 PMKevin Kho
06/24/2022, 2:45 PMmapped_task(unmapped(df), input_one, input_two)
df will be copied 9 times and pass through the scheduler 9 timesScott Aefsky
06/24/2022, 2:47 PM