Suresh R
02/02/2022, 10:45 AMAnna Geller
Suresh R
02/02/2022, 11:06 AMAnna Geller
Suresh R
02/02/2022, 11:16 AMt6,t7,t8
only runs 2 child tasks at a time.Anna Geller
Suresh R
02/02/2022, 12:04 PMclass LakeviewRunJobTask(Task):
AUTH_ENDPOINT = "<http://auth.api.aqfer.net|auth.api.aqfer.net>"
LAKEVIEW_ENDPOINT = "<http://lakeview.api.aqfer.net|lakeview.api.aqfer.net>"
ACCESS_TOKEN = "ACCESS_TOKEN"
REFRESH_TOKEN = "REFRESH_TOKEN"
def __init__(
self,
cid: str,
job: str,
params: Optional[Dict[Any, Any]] = None,
cluster: Optional[str] = None,
concurrent: bool = False,
poll_interval: int = 300,
skip: bool = False,
**kwargs
):
self.cid = cid
self.job = job
self.params = params
self.cluster = cluster
self.concurrent = concurrent
self.poll_interval = poll_interval
self.skip = skip
super().__init__(**kwargs)
@defaults_from_attrs("cid", "job", "cluster", "concurrent", "params", "poll_interval", "skip")
def run(self, cid: str = None, job: str = None, cluster: str = None, concurrent: bool = False, params: str = None, poll_interval: str = None, skip: bool = False):
if skip:
raise signals.SKIP()
self.execution_id = self._create_job(
cid, job, params, cluster, concurrent)
self._get_job(cid, job, poll_interval)
rj = self._get_metrics(cid, job)
return rj
Anna Geller
Suresh R
02/02/2022, 12:05 PMAnna Geller
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
@task
def generate_random_numbers():
return list(range(1, 200))
@task
def add_one(x):
return x + 1
@task(log_stdout=True)
def print_results(res):
print(res)
with Flow("mapping", executor=LocalDaskExecutor()) as flow:
numbers = generate_random_numbers()
result = add_one.map(numbers)
print_results(result)
and you assign the same settings including:
• the same executor
• the same run config
• the same concurrency limit on the mapped task
Does it still lead to the same error on this simple flow? If so, then it’s an issue with resource allocation on the Kubernetes pod side. If not, then it may be an issue with the definition of the mapped task (LakeviewRunJobTask)with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
cpu_request=4,
memory_request="4Gi",
),
) as flow:
Suresh R
02/02/2022, 12:15 PMAnna Geller
Suresh R
02/02/2022, 12:21 PMAnna Geller
with Flow(FLOW_NAME) as flow:
event_month = Parameter("event_month", default=None)
a = compute_input_params(event_month)
t1 = collate_monthly_crossdevice(params=a)
t2 = collate_quarterly_crossdevice(params=a, upstream_tasks=[t1])
t3 = import_adform_lotame(params=a)
arg_map = generate_arg_map(a["event_month"], a["prev_three_months"])
t4 = collate_monthly_lotame.map(params=arg_map)
t5 = collate_quarterly_lotame_cookie.map(params=arg_map)
t6 = collate_quarterly_lotame_mobile.map(params=arg_map)
t7 = collate_quarterly_crdlotcookie.map(params=arg_map)
t8 = collate_quarterly_crdlotmobile.map(params=arg_map)
but if t6 must have before, it complicates things 😅# Use 8 threads
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)
# Use 8 processes
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
Suresh R
02/02/2022, 12:41 PMt6 = collate_quarterly_lotame_mobile.map(params=arg_map)
Anna Geller
t6 = collate_quarterly_lotame_mobile.map(params=arg_map, upstream_tasks=[r2])
Kevin Kho
num_workers
because sometimes Prefect picks up a default of 2 even if you have more coresSuresh R
02/02/2022, 5:25 PMnum_workes
. Concurrency is fine now.
But one of mapped task is stuck and downstream of the stuck task failed with error At least one upstream state has an unmappable result
.t5
is stuck.Kevin Kho
upstream_tasks=[unmapped(t4)]
Suresh R
02/02/2022, 5:33 PMKevin Kho
Suresh R
02/02/2022, 5:47 PMupstream_tasks=
.upstream_tasks=[unmapped(t4)]
, flow is working fine.Kevin Kho
Suresh R
02/02/2022, 5:50 PMKevin Kho
Suresh R
02/02/2022, 5:59 PM