Hi!, I have a mapped task with 21 childs, also se...
# prefect-community
s
Hi!, I have a mapped task with 21 childs, also set task concurrency of 5, But only 2 child tasks are scheduled at a time, What might be the cause?
a
Are you running this on Dask? How is your executor and storage configured? What agent are you using? It could be a mix of concurrency limits and the capacity set on the executor
s
I am running LocalDaskExecutor, it is running on K8s
a
Can you share more details hiw exactly it's configured? If you could share a full small flow example, I could try to reproduce to investigate
s
Cloud <- K8s Agent -> K8sExecutor
Mapped tasks like
t6,t7,t8
only runs 2 child tasks at a time.
a
Can you share the simplified definition of LakeviewRunJobTask? Do you have a params argument defined as a list on the init or run method? Something doesn’t seem right with this class. I can recall you also had an issue before that it was generating double logs.
s
Copy code
class LakeviewRunJobTask(Task):
    AUTH_ENDPOINT = "<http://auth.api.aqfer.net|auth.api.aqfer.net>"
    LAKEVIEW_ENDPOINT = "<http://lakeview.api.aqfer.net|lakeview.api.aqfer.net>"
    ACCESS_TOKEN = "ACCESS_TOKEN"
    REFRESH_TOKEN = "REFRESH_TOKEN"

    def __init__(
        self,
        cid: str,
        job: str,
        params: Optional[Dict[Any, Any]] = None,
        cluster: Optional[str] = None,
        concurrent: bool = False,
        poll_interval: int = 300,
        skip: bool = False,
        **kwargs
    ):
        self.cid = cid
        self.job = job
        self.params = params
        self.cluster = cluster
        self.concurrent = concurrent
        self.poll_interval = poll_interval
        self.skip = skip
        super().__init__(**kwargs)
@defaults_from_attrs("cid", "job", "cluster", "concurrent", "params", "poll_interval", "skip")
    def run(self, cid: str = None, job: str = None, cluster: str = None, concurrent: bool = False, params: str = None, poll_interval: str = None, skip: bool = False):
        if skip:
            raise signals.SKIP()
        self.execution_id = self._create_job(
            cid, job, params, cluster, concurrent)
        self._get_job(cid, job, poll_interval)
        rj = self._get_metrics(cid, job)
        return rj
a
I also wonder what resources did you define on the Kubernetes job_template. Perhaps the pod doesn’t have enough resources to run more than two tasks in parallel on a LocalDaskExecutor?
s
I gave 300m and 300Mi for CPU and memory
a
We could also approach it the other way around: if you try a much simpler flow with mapping, e.g.:
Copy code
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor


@task
def generate_random_numbers():
    return list(range(1, 200))


@task
def add_one(x):
    return x + 1


@task(log_stdout=True)
def print_results(res):
    print(res)


with Flow("mapping", executor=LocalDaskExecutor()) as flow:
    numbers = generate_random_numbers()
    result = add_one.map(numbers)
    print_results(result)
and you assign the same settings including: • the same executor • the same run config • the same concurrency limit on the mapped task Does it still lead to the same error on this simple flow? If so, then it’s an issue with resource allocation on the Kubernetes pod side. If not, then it may be an issue with the definition of the mapped task (LakeviewRunJobTask)
I think the resources you allocated may not be enough for your job. Can you try allocating more explicitly on your KubernetsRun? E.g.:
Copy code
with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["k8s"],
            cpu_request=4,
            memory_request="4Gi",
        ),
) as flow:
s
Ok
a
By looking at the flow visualization, I also see that the issue with parallelism occurs only after the reduce step. And currently your reduce step is doing nothing (or did you redact it for privacy?). I wonder if perhaps this might be an issue. Do you need this reduce step? And do t7 and t8 have to run after t6, or could t6 + t7 + t8 run in parallel?
s
When i made t6 as an upstream for t7, by defining t7.upstream_tasks[t6]. t7 was not starting, so i introduced dummy task named reduce.
Actually I want to run t7-1 after t6-1, t7-2 after t6-2 so on, but t7-1 is not getting any inputs from t6-1. I just want to run t7-1 after t6-1.
a
I was wondering if changing the structure this way + allocating more resources on the pod could help with parallelism:
Copy code
with Flow(FLOW_NAME) as flow:
    event_month = Parameter("event_month", default=None)
    a = compute_input_params(event_month)
    t1 = collate_monthly_crossdevice(params=a)
    t2 = collate_quarterly_crossdevice(params=a, upstream_tasks=[t1])
    t3 = import_adform_lotame(params=a)
    arg_map = generate_arg_map(a["event_month"], a["prev_three_months"])
    t4 = collate_monthly_lotame.map(params=arg_map)
    t5 = collate_quarterly_lotame_cookie.map(params=arg_map)
    t6 = collate_quarterly_lotame_mobile.map(params=arg_map)
    t7 = collate_quarterly_crdlotcookie.map(params=arg_map)
    t8 = collate_quarterly_crdlotmobile.map(params=arg_map)
but if t6 must have before, it complicates things 😅
I think you can try this out with various KubernetesRun and LocalDaskExecutor configuration. You can adjust the executor settings this way:
Copy code
# Use 8 threads
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)

# Use 8 processes
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
s
Sure, I will try
How i can define upstream here
Copy code
t6 = collate_quarterly_lotame_mobile.map(params=arg_map)
a
this should work:
Copy code
t6 = collate_quarterly_lotame_mobile.map(params=arg_map, upstream_tasks=[r2])
k
I would first try defining
num_workers
because sometimes Prefect picks up a default of 2 even if you have more cores
upvote 1
s
@Anna Geller @Kevin Kho I changed the flow as per Anna's suggestion and increased
num_workes
. Concurrency is fine now. But one of mapped task is stuck and downstream of the stuck task failed with error
At least one upstream state has an unmappable result
.
t5
is stuck.
k
When you do the consecutive mapping, are the number of elements changing? Do you want all of t4 to run before t5?
Can you try
upstream_tasks=[unmapped(t4)]
s
I have to run the Jobs for 21 countries, each country is independent of other one. I am passing country as map elements. t5 and t6 of a country is dependent on t4 of the country. So t4,t5 and t6 each has 21 elements. May be i am using the map in wrong place. Can you suggest some alternative for the use case.
k
That sounds like a right setup to me but it looks like one of your tasks might be returning something not iterable and you are trying to map over that. Can you try logging the outputs or the length of the outputs to be sure as you run your Flow?
s
I am not passing result of t4 to t5 or t6. I am just creating dependency by
upstream_tasks=
.
With this
upstream_tasks=[unmapped(t4)]
, flow is working fine.
k
Oh really? Then I guess just add the unmapped values to stream because there is no data dependency
s
t5,t6 doesn't understand t4 result, can i just pass it as an dummy variable so the dependency will be created?
I don't want t5 , t6 to wait for whole t4 to complete.
k
You can try that. Just note it will be mapped 1:1. If t4 is 10 elements and t5 is 10 elements. Passing it like that makes each t5 element dependent on the upstream t4 task
s
ok