b

    Brett Papineau

    3 years ago
    Hello. So I'm investigating if we can utilize prefect as our job(task) scheduler in a function driven data warehouse. This investigation comes after determining that making significant work arounds for airflow is unsustainable debt. On preliminary investigation I've noticed a few problems and have a few questions if you have the time. I will be putting them in a topic reply.
    Problems (so far):1. Prefect seems to have issues once you balloon the number of ‘nodes’ in the DAG to 10k and beyond. When we were planning on having the system do row-by-row functional computation starting with a large batch this becomes problematic. Basically there seems to be a large delay when building the context and starting the run. This is easily replicable by just making a 10000 length list constant and sending it to a task.map function. I understand we can partially work around this by reducing the number of strict tasks through grouping. Related Questions: A. Is there a way to turn off automatic collection/constant task generation? B. Is the approach a different executor? I attempted dask distributed and it was much slower. C. Is there a setting/config somewhere? 2. Dask distributed workers are slow in a large DAG. This brings back the reminder that airflow takes 5-10s to even begin a task. We thought the problems from 1 might be handled better if we massively distribute the workload, however, the dask distributed seems to heavily favor a single worker at any time and obviously took much longer at more simplistic tasks. Related Questions: A. I assume you can section off pieces of a flow to run on distributed and others not. I’m not sure if this is against practice or anything but it comes into play with my questions related to flows later. My general attempt was nesting a flow inside the @task function to change the executor on things we expect to take some time to complete. General Questions:1. Tasks A. It seems like you cannot do nested tasks in prefect. Is there something akin to
    from prefect.core.flow import current_flow
    and in def task_name(flow=current_flow):? Or rather a way to give flow context downstream like passing flow=flow to each subsequent task? Or is the suggested design entirely based on nested flows? B. Turning off non-explicit tasks. As above we will probably be supplying data outside of prefect in appropriate locations and do not want it to suddenly mess with execution as the DAG grows. C. When the granularity becomes a row-by-row handler it seems unlikely we will make use of the cache. In terms of execution time savings is there a way to disable it? 2. Flows A. Because we want everything to be functional and start from any point I have been using the context manager of new flows in functions with a @task decorator and pass flow=<context_manager_task_name> to tasks further in. I’m not sure if this is the right approach but this doesn’t keep state or context upstream. Meaning if I capture results from main_flow.run() it only shows down to the first nested flow. I saw that there was a conditional.merge() functionality but not necessarily one for nested flows. B. Configuration to run particular tasks at any stage if necessary. Below I outline a general overview of what the first system might look like. I believe this might imply we supply the first input before the main flow. Am I limited to just command line arg parsing? 3. Parameters/Config A. It seems like we would likely use prefect.context whenever available over parameters, this is so downstream flows can inherit configurations from parent flows. Likewise can we supply downstream flow context with with something like with prefect.context(current_flow.context) or am I just taking the wrong approach to everything? General Idea of what kind of early flow we have in mind: Given parameters extract data from source(s) -> for each source in parallel use extraction to calculate change data capture -> handle changes individually but do not execute (through .map returning all) -> wrap all changes in a transaction block -> execute transaction block -> after completion of all sources report results to slack. In addition to this we plan on extensive metadata/logging throughout the process, but that comes later.
    A short toy script I was using to try to figure out what the structure may look like.
    @task
    def produce_input() -> list:
        return [i for i in range(10000)]
    
    
    @task
    def other_flow():
        with Flow('other flow') as other_flow:
            input_other = produce_input(flow=other_flow)
            
        other_state = other_flow.run()
        
        return other_state
    
    @task
    def nested_flow():
        with Flow('nested flow') as nested_flow:
            input_nested = produce_input(flow=nested_flow)
            other_state = other_flow(flow=nested_flow)
            other_state2 = other_flow(flow=nested_flow)
        
        nested_state = nested_flow.run()
        
        return nested_state
    
    if __name__ == '__main__':
        with Flow('testing') as test_flow:
            nested_state = nested_flow(flow=test_flow)
        
        test_state = test_flow.run() # Only aware of nested_flow
        
        test_flow.visualize() # Only aware of nested_flow
        
        sys.exit(0)
    Chris White

    Chris White

    3 years ago
    Hi @Brett Papineau! Interesting thoughts. Let me try to address a few of your questions here:- is there a reason you aren’t using Prefect Parameters? It sounds like that’s what you mean when you refer to “non-explicit tasks” and also your use of context is a smell for Parameters to me; you could easily have a single
    config
    type parameter containing a large configuration object. In fact, Flow Parameters are actually put into context during the run- one of our earliest partners actually had a flow with 13k+ memory-intensive tasks and it ran just fine, so I’m not sure about the 10k limit you’re referring to - Is there a reason you need to represent each row’s computation as a Prefect Task? Why not batch the row processing? - with regards to your sub-flow questions, this is actually interesting timing --> we are currently having some early conversations about supporting such a pattern. I’d be super interested to better understand your use case so we can keep it in mind during our design --> would you mind sending me an email at
    chris at prefect dot io
    so we can maybe dig a little deeper into your requirements?
    b

    Brett Papineau

    3 years ago
    We plan on moving from batch 'fake' Change Data Capture to real CDC hence we split the granularity down to a single row/change. It might seem odd we do batch -> row -> batch. We will eventually strip out the first batch for real CDC, and the last batch is due to suggested use of transaction blocks on our target database Amazon Redshift. Redshift hates singular inserts/updates but loves block transactions.
    I'm not saying prefect has a problem running the 10k tasks, the problem is the ~2-5 minute delay when initializing both the flow and before the first task in the flow is executed after flow.run(). If we plan on moving any pieces of the data warehouse to realtime / low latency we don't want prefect to become the bottleneck.
    Chris White

    Chris White

    3 years ago
    Interesting interesting - I’ve never observed a 2-5 minute delay as you describe before, and I’m not sure what would cause behavior like that -> the first task is submitted to the executor almost instantaneously
    b

    Brett Papineau

    3 years ago
    The reason for each row being it's own task is interweaving testing/audits/key lookups on a per row level. We want to elevate any and all data that results from the job scheduler execution. This is including success states from something as minuscule as processing a single row. I realize with the current batch setup we might potentially reach well over 150 million (if we converted current dataware to the new system) tasks in a single flow run.
    Chris White

    Chris White

    3 years ago
    Thanks for the additional context! I’ll admit your use case is somewhat unusual for prefect (CDC + almost realtime latency requirements); I’m curious what features of Prefect drew you to it?
    b

    Brett Papineau

    3 years ago
    Here's an example script:
    import time
    
    @task
    def plus_ten(i: int):
        return i+10
    
    delay_input = [i for i in range(10000)]
    
    if __name__ == '__main__':
        start_time = time.time()
        with Flow('test delay') as delay_flow:
            delay_input_plus_ten = plus_ten.map(delay_input)
        
        init_time = time.time()
        print(init_time - start_time)
        
        delay_flow.run()
        
        end_time = time.time()
        print(end_time - init_time)
    It took 20s to build with context manager. It took 60s to execute. If you build it returning delay_input as a task it results in 10k less (constant) tasks but near instant completion. My timing window might've been from 100k testing, which is realistic as the first extractor we ran has 176k rows.
    We are wavering between event-driven and function-driven formulas for our next iteration of the data warehouse. Many, many implementations of function-driven utilize airflow in some format. We determined the work arounds necessary, as we identified prior to finding prefect (which is also in your docs), to be unsustainable debt for the system. The only constraint we are trying to keep right now is having the system on python.
    Chris White

    Chris White

    3 years ago
    Ah gotcha! I understand the delays you’re seeing. For tasks such as
    delay_input
    I’d recommend explicitly representing it as a task using prefect.tasks.core.constants.Constant; if you add:
    from prefect.tasks.core.constants import Constant
    
    delay_input = Constant([i for i in range(10000)])
    to your example script you’ll gain a lot of efficiency and speed
    b

    Brett Papineau

    3 years ago
    As a note for CDC with prefect / real time, we likely will identify potential pipelines capabilities for scheduled dag/flow runs of ~5-10mins and not be actual real-time. Right now data warehouse v1 is on 24 hour total extraction and total rebuild schedule and a goal going forward was to reduce that time and allow pieces to be rebuilt on command.
    Ok that tips helps some. Init is now nothing and execution is 1/6th. Something to note when I continue investigation again later. My assumption was the constant task creation would be the same regardless of wether I explicitly described it as such or not.
    Chris White

    Chris White

    3 years ago
    yea that’s fair - our auto-generation of tasks goes very granular