Hi, I have task A and task B, both mapped and A is...
# ask-community
e
Hi, I have task A and task B, both mapped and A is the upstream task of B. Is there a way of having the successful children task in A move on to B regardless of the state of other children tasks in A?
k
What executor are you using?
e
daskexecutor
k
This should execute depth first. Can I see your code?
e
With Flow() as flow:
    # inputs is a list      Output = taskA_and_taskB.map(inputs, unmapped(prop)) @task() Def taskA_and_taskB(input, prop):     outputA = taskA(input, prop)     outputB = taskB(outputA, prop)
Something like this, what I want to accomplish is having Target for taskA and taskB, so we could skip rerunning A when B fails
k
I am confused why A would rerun if B fails:
Copy code
with Flow(...) as flow:
    a = task_a.map(inputs)
    task_b.map(a)
If your setup if like this, it will execute in a depth first way and then is B fails and you retry, the target from a should be retrieved for it. Or am I missing something?
e
if set up in your way, will the start of task_b be conditioning on the finish of all children task_a or on the finish of any task_a?
k
If A is 10 items and B is 10 items, B1 is dependent on A1 and B2 is dependent on A2, but only on DaskExecutor
e
got you, on a relevant topic, say if i also have 10 dask workers, 1 for each item. is there a way to know which worker the item is sent to?
k
I dont think so but there is a way to intentionally send them to a certain worker
Read the last sentence before the
args
here . I present to you the least known Prefect feature lol
e
ah i think i got it, so basically we could assign a label to the identical workers by for instance --resources "label=1", and send certain tasks to certain workers
is it just
@task(tags="dask-resource:index=1")
?
k
I think so but I’ve never used
e
ok, time for some testings. thanks!
hi, a follow-up question, for some reason, prefect no longer creates target file in /.prefect, it worked fine before. anything i missed?
k
Nothing has changed around this in forever. Is your
checkpoint=False
inside the task? This is for a run with an agent right?
e
ah i thought adding environment variable has made local run work, it turned out it didn't
k
Ah did you figure it out?
Ah
Copy code
export PREFECT__FLOWS__CHECKPOINTING=true
should work
e
ok, let me try