Hi all. I have a flow with the following tasks def...
# prefect-community
s
Hi all. I have a flow with the following tasks defined:
Copy code
capture_ID_batch, all_capture_metadata = get_unprocessed_capture_ID_batches(snowflake_tbl, experiment_id=10, batch_size = 500)
    capture_loc_map = get_locations_for_captures(all_capture_metadata, s3_bucket)
    #read the files from local disk given IDs, process and record proc jsons
    process_batch.map(capture_ID_batch, unmapped(snowflake_tbl), unmapped(capture_loc_map), unmapped(intrazone_extractor), unmapped(dynamic_zone_finder))
The Prefect schematic shows that my first task `get_unprocessed_capture_ID_batches`is getting duplicated, and my flow is eventually failing because of a Dask scheduler timeout. The last task starts running, but won't complete because the timeout kills it. I'm trying to understand a couple of things: • Why is my first task showing up as if it's a mapped task? • Why is the base node of that task not completing if each of the 2 instances of it are complete? • Why is the scheduler timing out? If I run this flow with a small amount of data, it runs fine, but with a larger dataset I get this failure. Thanks for any help you can provide!
k
Hey @Scott Aefsky, could you move the image to the thread to keep the main channel cleaner when you get a chance? What is your Dask cluster setup?
s
We are using an on-demand Dask cluster via Fargate
Copy code
base_run_config = ECSRun(
    image=ECR_IMAGE,
    execution_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-execution-role',
    task_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-task-role',
    run_task_kwargs={
        "cluster": "PrefectCluster",
    },
    labels=[account],
    cpu=2048,
    memory=16384
)

def dynamic_executor():
    if prefect.context.parameters["use_fargate"] == 'true':
        return FargateCluster(
            image= ECR_IMAGE,
            n_workers= 5, # Must specify n_workers
            cluster_arn= fr"arn:aws:ecs:us-east-1:{account}:cluster/PrefectCluster",
            task_role_arn= fr"arn:aws:iam::{account}:role/prefect-ecs-task-role",
            vpc= vpc,
            subnets= [subnet],
            security_groups= [sg],
            cloudwatch_logs_group=f'prefect_ecs_task_log_{account}'
        )
    else:
        return LocalCluster()

base_executor = DaskExecutor(
    cluster_class=dynamic_executor
)
For this run, the parameter is set to "true"
k
Ok so I don’t have all the immediate answers but general thoughts: 1. if you can specify processes for FargateCluster, that may be more stable 2. the scheduler timeout does seem to be memory related or the scheduler is bottlenecked in some way. remember all the data passed to workers passes through the scheduler (i think you know this cuz it seems you made the workers read data directly). still though, I guess the question is if there is a lot being mapped through the scheduler. you may need to bump up scheduler resources 3. if you have a sequence of tasks A->B->C and they get submitted to Dask together, and A succeeds and B succeeds but the worker dies during C, Dask can try to re-run the chain because it’s not aware of Prefect’s checkpointing. Cloud has version locking to stop this but Server does not. Which one are you on and do you see workers dying? 4. if you click into the first task and view the task run page, is it really a mapped task in the UI?
s
Sorry, @Kevin Kho, I got pulled into a bunch of meetings yesterday afternoon and am just now picking this back up. 1. I'm not sure what you mean by "specify processes". Can you give me more details on this? 2. I'm not seeing any errors in the scheduler logs, but I can try bumping up its resources. Where can I do that? Does that get set through the default task definition if I have one set up? 3. We are on Cloud. I see a couple of errors in the worker logs that look like they're dying:
Copy code
distributed.comm.core.CommClosedError: Comm <TCP (closed) Worker->Scheduler local=<tcp://10.0.94.23:58076> remote=<tcp://10.0.78.242:8786>> already closed.

2022-06-24T09:41:19.616-04:00	distributed.nanny - INFO - Worker closed

2022-06-24T09:41:20.397-04:00	distributed.nanny - INFO - Closing Nanny at '<tcp://10.0.94.23:46079>'

2022-06-24T09:41:20.397-04:00	distributed.dask_worker - INFO - End worker
4. If I click into the first task, I see the attached screenshot, so I think it is really a mapped task.
(Tagging my colleague @Harrison Kim for his visibility)
k
There is a lot going on lol. I will respond part by part. So on the mapped task, I got the same thing by doing this
Copy code
from prefect import Flow, task

@task(nout=2)
def abc():
    return 1, 2

@task
def bcd():
    return 1

@task
def cde():
    return 1

with Flow("..") as flow:
    x, y = abc()
    bcd(upstream_tasks=[x])
    cde(upstream_tasks=[y])

flow.register("databricks")
So it appears anything with
nout
gets renders as mapped. Definitely not ideal, but honestly unlikely to change with the focus on 2.0
On processes. I meant processes as opposed to threads.
Copy code
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True, n_workers=4)
For local cluster, the default is processes. For FargateCluster, it’s something convoluted like:
Copy code
FargateCluster(n_workers=4, threads_per_worker=1)
But I would rank this as lower priority than sorting out the worker crashes
Scheduler resources will also be defined in
FargateCluster
. It can take
scheduler_mem
and
scheduler_cpu
as cluster kwargs
I think the worker logs dying is the main source of weirdness here. It looks like Dask wants to re-run the upstream dependency. It should be stopped by Prefect and loaded from cache. Honestly hard to say if: 1. the result is loaded from cache but the task is labelled as running 2. there is a full re-running of the task I think it’s number 1
s
Ok, cool. I guess I don't really mind if the
nout=2
task is getting rendered twice, as long as it isn't actually getting run twice. I will add some resources to the scheduler vis the FargateCluster kwargs to see if that helps. For the workers point, the weird thing is I get the error message of the scheduler timeout on the flow
Copy code
[24 June 2022 9:40am]: Unexpected error: OSError('Timed out trying to connect to <tcp://3.234.183.8:8786> after 30 s')
but the workers still run for a couple more minutes. The workers seem like they're running over the full set of mapped data they're supposed to process, but can't report their status back to the scheduler. So the next set of jobs in the map can't get started. Does that make sense?
And I will keep in mind that we may need to switch from threads to processes if things continue to behave weirdly.
k
I believe that makes sense. It really looks like the scheduler is bogged down. Before you increase the scheduler memory, you may want to try limiting the amount of data that passed through it. You know what I mean right?
s
I think so. If I'm passing a large dataframe between tasks, that data has to pass through the scheduler. So I may be better off having the first task write that dataframe to disk, and only pass a filename to the next tasks, and let them load it themselves. Is that what you're getting at?
k
Yes you get it. Prefect just straight up copies the data needed because it’s a wrapper for Dask. Imagine you have 3 workers and 9 tasks and you did,
Copy code
mapped_task(unmapped(df), input_one, input_two)
df will be copied 9 times and pass through the scheduler 9 times
s
Got it. I will refactor and try to solve for that. Thanks for the help as always, Kevin!