Jon
10/12/2023, 4:34 PM@task
def process_and_call_subflow()
process_thing()
flow_2()
@flow
def flow_2()
pass
@flow
def flow_1()
process_and_call_subflow.map()
Jon
10/12/2023, 4:42 PMJon
10/13/2023, 4:05 PMNate
10/13/2023, 4:09 PM@task
def process_item(item) -> T:
# do some work
result = some_work(item) # assumes some_work produces a dict
run_deployment("my-flow/my-deployment", parameters=result) # assumes my-flow accepts `result`
return result
@flow
def processing(items) -> Iterable[T]:
return process_item.map(items)
Jon
10/13/2023, 4:14 PMrun_deployment
is basically the same as calling the flow as a function, except i assume the way this is called we exit the subflow pattern / lose the parent-child relationship?Nate
10/13/2023, 4:15 PMrun_deployment
will be treated as a vanilla python function because its called from a task, whereas if you call it from a flow, itll act like a subflowNate
10/13/2023, 4:16 PMemit_event
if you're on cloud and need to bookkeep / event off the act of triggering another deploymentRowdy
10/13/2023, 4:16 PMJon
10/13/2023, 4:18 PMNate
10/13/2023, 4:22 PMrun_deployment
directly, rather then parsing them out the payload of the event, but that's totally case by case imoJon
10/13/2023, 4:23 PMNate
10/13/2023, 4:26 PMJon
10/16/2023, 4:44 PMDaskTaskRunner
?
instead of tucking all logic under one task and `map`ping over it, we can do a simple for
loop and call tasks/flows one-by-one under it:
@task
def process_item(item) -> T:
result = some_work(item) # assumes some_work produces a dict
return result
@flow
def process_2(result):
pass
@flow(task_runner=DaskTaskRunner)
def process_1(items) -> Iterable[T]:
for item in items: # executes in parallel
result = process_item(item)
process_2(result) # called as subflow
Jon
10/16/2023, 4:47 PMNate
10/16/2023, 4:47 PMsubmit()
method, which map
will call for each item in your iterable
process_2
is a flow, so:
⢠it has no submit method, it will be a blocking call
⢠its run cannot be submitted to a task runner like daskNate
10/16/2023, 4:49 PMNate
10/16/2023, 4:51 PMprocess_2
a blocking flow call immediately after process_item
(which youd need to call .submit on if you wanted to actually use the task runner), the fact that the process_item
could leverage the task runner would sort of be hamstringed by the blocking flow immediately afterJon
10/16/2023, 4:52 PM@task
def one_big_task
thing_1()
thing_2()
thing_3()
@flow
def do_in_parallel()
one_big_task.map()
# ---------------
@task
def thing_1()
pass
@task
def thing_2()
pass
@task
def thing_3()
pass
@flow
def do_in_parallel()
for thing in things:
thing_1()
thing_2()
thing_3()
flow_a() # ideally we can call a flow here
Nate
10/16/2023, 4:53 PMNate
10/16/2023, 4:53 PMJon
10/16/2023, 4:53 PMJon
10/16/2023, 5:41 PMyou're going to be blocking that iteration until that subflow finisheshere you mean that the iteration itself will be blocked, but other iterations will continue to run in parallel, yeah?
Jon
10/16/2023, 5:41 PMNate
10/16/2023, 6:02 PMfor _ in range(42):
future = some_task.submit() # returns with a future immediately
your_flow() # which might as well be: sleep(number_of_seconds_the_flow_takes)
which seems to nullify the benefit of using the non-blocking / submit interface on tasks, because each loop is waiting for the blocking flow (unless you use run_deployment as mentioned)Nate
10/16/2023, 6:02 PMJon
10/16/2023, 6:06 PMDaskTaskRunner
each iteration is running in parallel?Jon
10/16/2023, 6:06 PMNate
10/16/2023, 6:08 PMbut with thesorry, each iteration of what?each iteration is running in parallel?DaskTaskRunner
Nate
10/16/2023, 6:09 PMJon
10/16/2023, 6:10 PM@task
def thing_1()
pass
@task
def thing_2()
pass
@task
def thing_3()
pass
@flow
def do_in_parallel(task_runner=DaskTaskRunner)
for thing in things:
thing_1()
thing_2()
thing_3()
# since we're running in parallel w Dask,
# waiting on this blocks only tasks in this iteration
# not the entire loop
flow_a()
Nate
10/16/2023, 6:10 PMJon
10/16/2023, 6:11 PM@task
def thing_1()
pass
@task
def thing_2()
pass
@task
def thing_3()
pass
@flow
def do_in_parallel(task_runner=DaskTaskRunner)
for thing in things:
thing_1().submit()
thing_2().submit()
thing_3().submit()
# since we're running in parallel w Dask,
# waiting on this blocks only tasks in this iteration
# not the entire loop
flow_a().submit()
Jon
10/16/2023, 6:11 PMNate
10/16/2023, 6:12 PMthis goes back to my earlier point there's no submit interface on flows, so even though thing_x.submit() is non-blocking, the flow call is blocking unless you do run_deploymentCopy codeflow_a().submit()
Jon
10/16/2023, 6:15 PMrun_deployment
we can iterate in parallel, and get a flow's result without blocking. what we lose is the subflow features in the UI?Nate
10/16/2023, 6:17 PMdo_in_parallel
, so it should look exactly like a subflow in the UI
and almost yes
get a flow's result without blockingif you call run_deployment(..., timeout=0) you're not going to get the final state because it will return immediately, but you can fetch the result later once you know its done
Jon
10/16/2023, 6:23 PMNate
10/16/2023, 8:55 PMFlowRun
objects you get back from run_deployment
(with timeout=0) and use the flow_run.id
to read the finished flow run later when you know it finished, which is a tradeoff one would accept when they choose to use timeout=0
while also needing the flow result
otherwise, you just have to wait for the flow run you trigger to finish as a blocking call (run_deployment or the regular flow object), since one way or another, it does need to finish before you can get the result š