hey folks -- i understand that we cannot call task...
# best-practices
j
hey folks -- i understand that we cannot call tasks from within tasks. if we want to invoke a subflow within a task that's mapped, how do we do that? or perhaps -- what we really want is to kick off Flows concurrently when each execution of a mapped task completes. this is so we don't need to wait for all executions to complete to continue to the next thing, which is a totally separate Flow
Copy code
@task
def process_and_call_subflow()
    process_thing()
    flow_2()

@flow
def flow_2()
    pass

@flow
def flow_1()
    process_and_call_subflow.map()
cc @Rowdy
@Nate -- any thoughts?
n
hi @Jon - you're free to call python functions from tasks, but you cannot currently call tasks or flows from a task > what we really want is to kick off Flows concurrently when each execution of a mapped task completes. can you do something like this?
Copy code
@task
def process_item(item) -> T:
   # do some work
   result = some_work(item) # assumes some_work produces a dict
   run_deployment("my-flow/my-deployment", parameters=result) # assumes my-flow accepts `result`
   return result

@flow
def processing(items) -> Iterable[T]:
   return process_item.map(items)
j
ah cool! so in this case,
run_deployment
is basically the same as calling the flow as a function, except i assume the way this is called we exit the subflow pattern / lose the parent-child relationship?
n
yeah
run_deployment
will be treated as a vanilla python function because its called from a task, whereas if you call it from a flow, itll act like a subflow
but you can
emit_event
if you're on cloud and need to bookkeep / event off the act of triggering another deployment
šŸ™Œ 1
r
got it - thanks Nate!
šŸ‘ 1
j
or we could emit an event that directly triggers a flow run, yeah?
n
absolutely, though in some cases where you're literally generating the params you need for that deployment in the place you'd emit the event, it feels easiest to just pass those params to
run_deployment
directly, rather then parsing them out the payload of the event, but that's totally case by case imo
j
agreed! thank you so much for hopping right into our Q's!
n
šŸ‘
j
@Nate -- a follow up -- want to confirm that we can simplify a bit here + gain performance by using the
DaskTaskRunner
? instead of tucking all logic under one task and `map`ping over it, we can do a simple
for
loop and call tasks/flows one-by-one under it:
Copy code
@task
def process_item(item) -> T:
   result = some_work(item) # assumes some_work produces a dict
   return result

@flow
def process_2(result):
    pass

@flow(task_runner=DaskTaskRunner)
def process_1(items) -> Iterable[T]:
   for item in items: # executes in parallel
       result = process_item(item)
       process_2(result) # called as subflow
thx in advance for your patience šŸ™‚ so many options here, which is great, but also slow to land on what might work best..
n
not exactly. the difference is that tasks have this
submit()
method, which
map
will call for each item in your iterable
process_2
is a flow, so: • it has no submit method, it will be a blocking call • its run cannot be submitted to a task runner like dask
hold on, i might have misunderstood what you just said
actually, yeah i stand by that. because you're calling
process_2
a blocking flow call immediately after
process_item
(which youd need to call .submit on if you wanted to actually use the task runner), the fact that the
process_item
could leverage the task runner would sort of be hamstringed by the blocking flow immediately after
j
an outcome that would be good for us is that instead of having all logic in one task that gets mapped, we'd like the iterate over a series of tasks
Copy code
@task
def one_big_task
   thing_1()
   thing_2()
   thing_3()

@flow
def do_in_parallel()
   one_big_task.map()

# ---------------

@task
def thing_1()
   pass

@task
def thing_2()
   pass

@task
def thing_3()
   pass

@flow
def do_in_parallel()
   for thing in things:
      thing_1()
      thing_2()
      thing_3()
      flow_a() # ideally we can call a flow here
n
thats totally valid! its just that when you stick a blocking flow call into your for loop, you're going to be blocking that iteration until that subflow finishes
a way around that (if you need that subflow) is to create a deployment for it and then call run_deployment with timeout=0
j
yup that was my next q -- thanks!
you're going to be blocking that iteration until that subflow finishes
here you mean that the iteration itself will be blocked, but other iterations will continue to run in parallel, yeah?
as opposed to the whole loop stops to wait for that one iteration's flow
n
i mean it looks to me like you're basically going
Copy code
for _ in range(42):
   future = some_task.submit() # returns with a future immediately
   your_flow() # which might as well be: sleep(number_of_seconds_the_flow_takes)
which seems to nullify the benefit of using the non-blocking / submit interface on tasks, because each loop is waiting for the blocking flow (unless you use run_deployment as mentioned)
i could be misinterpreting the question though
j
but with the
DaskTaskRunner
each iteration is running in parallel?
e.g. it is not blocking
n
but with the
DaskTaskRunner
each iteration is running in parallel?
sorry, each iteration of what?
only tasks that you call submit on will be submitted to the dask task runner
j
Copy code
@task
def thing_1()
   pass

@task
def thing_2()
   pass

@task
def thing_3()
   pass

@flow
def do_in_parallel(task_runner=DaskTaskRunner)
   for thing in things:
      thing_1()
      thing_2()
      thing_3()
      # since we're running in parallel w Dask,
      # waiting on this blocks only tasks in this iteration
      # not the entire loop
      flow_a()
n
it appears to me, nothing in that code is being submitted to the dask runner. nothing is ever calling submit
j
Copy code
@task
def thing_1()
   pass

@task
def thing_2()
   pass

@task
def thing_3()
   pass

@flow
def do_in_parallel(task_runner=DaskTaskRunner)
   for thing in things:
      thing_1().submit()
      thing_2().submit()
      thing_3().submit()
      # since we're running in parallel w Dask,
      # waiting on this blocks only tasks in this iteration
      # not the entire loop
      flow_a().submit()
^^ how's that?
n
Copy code
flow_a().submit()
this goes back to my earlier point there's no submit interface on flows, so even though thing_x.submit() is non-blocking, the flow call is blocking unless you do run_deployment
j
ok, so seems with
run_deployment
we can iterate in parallel, and get a flow's result without blocking. what we lose is the subflow features in the UI?
n
run_deployment calls will actually be interpreted as a subflow if called from a flow like
do_in_parallel
, so it should look exactly like a subflow in the UI and almost yes
get a flow's result without blocking
if you call run_deployment(..., timeout=0) you're not going to get the final state because it will return immediately, but you can fetch the result later once you know its done
j
> but you can fetch the result later once you know its done scanning the docs, this seems like we'd use something like wait, but that looks like it's blocking? how to achieve getting the result without blocking iteration?
n
i think you'd have to do something like holding onto all the partially hydrated
FlowRun
objects you get back from
run_deployment
(with timeout=0) and use the
flow_run.id
to read the finished flow run later when you know it finished, which is a tradeoff one would accept when they choose to use
timeout=0
while also needing the flow result otherwise, you just have to wait for the flow run you trigger to finish as a blocking call (run_deployment or the regular flow object), since one way or another, it does need to finish before you can get the result šŸ™‚