Brett Papineau
07/29/2019, 10:29 PMfrom prefect.core.flow import current_flow
and in def task_name(flow=current_flow):? Or rather a way to give flow context downstream like passing flow=flow to each subsequent task? Or is the suggested design entirely based on nested flows?
B. Turning off non-explicit tasks. As above we will probably be supplying data outside of prefect in appropriate locations and do not want it to suddenly mess with execution as the DAG grows.
C. When the granularity becomes a row-by-row handler it seems unlikely we will make use of the cache. In terms of execution time savings is there a way to disable it?
2. Flows
A. Because we want everything to be functional and start from any point I have been using the context manager of new flows in functions with a @task decorator and pass flow=<context_manager_task_name> to tasks further in. I’m not sure if this is the right approach but this doesn’t keep state or context upstream. Meaning if I capture results from main_flow.run() it only shows down to the first nested flow. I saw that there was a conditional.merge() functionality but not necessarily one for nested flows.
B. Configuration to run particular tasks at any stage if necessary. Below I outline a general overview of what the first system might look like. I believe this might imply we supply the first input before the main flow. Am I limited to just command line arg parsing?
3. Parameters/Config
A. It seems like we would likely use prefect.context whenever available over parameters, this is so downstream flows can inherit configurations from parent flows. Likewise can we supply downstream flow context with with something like with prefect.context(current_flow.context) or am I just taking the wrong approach to everything?
General Idea of what kind of early flow we have in mind: Given parameters extract data from source(s) -> for each source in parallel use extraction to calculate change data capture -> handle changes individually but do not execute (through .map returning all) -> wrap all changes in a transaction block -> execute transaction block -> after completion of all sources report results to slack. In addition to this we plan on extensive metadata/logging throughout the process, but that comes later.@task
def produce_input() -> list:
return [i for i in range(10000)]
@task
def other_flow():
with Flow('other flow') as other_flow:
input_other = produce_input(flow=other_flow)
other_state = other_flow.run()
return other_state
@task
def nested_flow():
with Flow('nested flow') as nested_flow:
input_nested = produce_input(flow=nested_flow)
other_state = other_flow(flow=nested_flow)
other_state2 = other_flow(flow=nested_flow)
nested_state = nested_flow.run()
return nested_state
if __name__ == '__main__':
with Flow('testing') as test_flow:
nested_state = nested_flow(flow=test_flow)
test_state = test_flow.run() # Only aware of nested_flow
test_flow.visualize() # Only aware of nested_flow
sys.exit(0)
Chris White
07/29/2019, 10:54 PMconfig
type parameter containing a large configuration object. In fact, Flow Parameters are actually put into context during the run
- one of our earliest partners actually had a flow with 13k+ memory-intensive tasks and it ran just fine, so I’m not sure about the 10k limit you’re referring to
- Is there a reason you need to represent each row’s computation as a Prefect Task? Why not batch the row processing?
- with regards to your sub-flow questions, this is actually interesting timing --> we are currently having some early conversations about supporting such a pattern. I’d be super interested to better understand your use case so we can keep it in mind during our design --> would you mind sending me an email at chris at prefect dot io
so we can maybe dig a little deeper into your requirements?Brett Papineau
07/29/2019, 10:59 PMChris White
07/29/2019, 11:04 PMBrett Papineau
07/29/2019, 11:04 PMChris White
07/29/2019, 11:14 PMBrett Papineau
07/29/2019, 11:14 PMimport time
@task
def plus_ten(i: int):
return i+10
delay_input = [i for i in range(10000)]
if __name__ == '__main__':
start_time = time.time()
with Flow('test delay') as delay_flow:
delay_input_plus_ten = plus_ten.map(delay_input)
init_time = time.time()
print(init_time - start_time)
delay_flow.run()
end_time = time.time()
print(end_time - init_time)
It took 20s to build with context manager. It took 60s to execute. If you build it returning delay_input as a task it results in 10k less (constant) tasks but near instant completion. My timing window might've been from 100k testing, which is realistic as the first extractor we ran has 176k rows.Chris White
07/29/2019, 11:22 PMdelay_input
I’d recommend explicitly representing it as a task using `prefect.tasks.core.constants.Constant`; if you add:
from prefect.tasks.core.constants import Constant
delay_input = Constant([i for i in range(10000)])
to your example script you’ll gain a lot of efficiency and speedBrett Papineau
07/29/2019, 11:23 PMChris White
07/29/2019, 11:31 PM