Michael Hadorn

    Michael Hadorn

    7 months ago
    Question about Orion: generic tasks, flow build without running it. Details in thread.
    Hi I'm playing around with Orion. We use prefect core at the moment. There we build a flow with generic tasks, in simple two loops: • loop 1: register all tasks • loop 2: set dependencies for each tasks ◦ because it's know what every task needs, we can easy set this as dependencies - then every other task already exists. We don't have to resolve this dependencies by own and we don't have to register the tasks in the correct order. Then we have the flow and can run it. So now these question about Orion: • Can we register flows and task without directly running them? • Can we set references (or upstream tasks / wait_for) after registering tasks? • Is there a
    flow.visualize()
    in Orion? I understand the concept and benefits for the use with decorators, it's easy. But not that flexible. Thanks a lot for any clarification.
    Anna Geller

    Anna Geller

    7 months ago
    1. This is what deployments are for - you can create deployment for a new flow that has never been triggered before 2. There is no registration any more 🙂 Orion resolves dependencies dynamically at runtime, so you may create a deployment for a flow and change the task dependencies within the code without having to create a new deployment (aka reregister) 3. No, not atm. But as long as you run the flow e.g. locally, you can see the visualization in your local UI without having to register the flow
    Michael Hadorn

    Michael Hadorn

    7 months ago
    @Anna Geller Thanks a lot for your answer. Ah ok, I guess with the current version 2.0a10, we have to run the flow at least once, before we can see the flow as a deployment (
    DeploymentSpec
    as
    prefect deployment create xx
    will not result in display the flow in the GUI). - > Maybe it's a bug? About my second question: I try to explain the process of building my flow: • I've one task (one function) • This task has to be executed two times (this could be a generic amount) ◦ e.g. task_run name
    a
    ,
    b
    • each task run (a, b), know he's own dependencies ◦ e.g.
    a
    depends on
    b
    With prefect core I could create all tasks, and then set all references (and every reference was valid, because all tasks exists already). With Orion it seems I can not create tasks and set the deps later. At the moment of set task_run
    a()
    I can not set the wait_for
    b
    , because
    b
    is no task at this moment. How can I add deps later? I understand from your answer, that there is a way to still change deps at runtime. How can I do this? I would like, that i do not have to solve the calls of my tasks by myself. I also try to code this. My problem is this block at
    # second: set deps according to external logic
    from prefect import task, flow, get_run_logger
    from prefect.deployments import DeploymentSpec
    
    
    @task
    def do_stuff(part):
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"""{part} - here i\'am""")
    
    
    @flow(name='late_deps')
    def late_deps():
        # key = task_run-name; value = [deps]
        task_exec_config = {
            'a': ['b']      # a needs b
            , 'b': None     # b can start
        }
    
        # collection of prefect PrefectFutures with task_run-name as key to maintain references
        task_exec = {}
    
        # first: creation of all existing tasks
        for task_exec_name, task_deps_config in task_exec_config.items():
            task_exec[task_exec_name] = do_stuff.with_options(name=task_exec_name)(task_exec_name)
            # do_stuff.with_options(name=task_exec_name)(task_exec_name)
    
        # second: set deps according to external logic -> this will not work
        for task_exec_name, task_deps_config in task_exec_config.items():
            for dep in task_deps_config:
                print(f'Task {task_exec_name} needs task {dep}')
    
    # if __name__ == '__main__':
    #     late_deps()
    
    
    DeploymentSpec(
        flow=late_deps,
        name="late_deps",
    )
    Anna Geller

    Anna Geller

    7 months ago
    The DeploymentSpec can be specified in a completely different file e.g. you may have a file deployments.py in your root project directory containing all deployments. Then, to change the dependencies, you simply need to commit new flow code that is referenced in the DeploymentSpec. What are late dependencies? Are you migrating from some legacy solution and retrieving dependencies from there? Can you tell a bit more about the problem you are trying to solve injecting the dependencies this way? It can be easier to rewrite it to normal Python functions and specify dependencies using wait_for
    Michael Hadorn

    Michael Hadorn

    7 months ago
    @Anna Geller Thanks a lot for your response. Sure. • We constructed something like task-collections (a set of tasks). • This task-collections can have dependencies to other task-collections (this is also defined in the database). • This collections and deps are defined in a config table. • We are using prefect to setup tasks based on this task-collections. Now if we build the flow, we have to add these tasks and set the dependencies. Easiest way to achieve this is we use these two loops I described in my first post. (And we do not have to worry about the order of adding which task, prefect will solve all dependencies for us) If this is not possible with Orion, I have to resolve this dependencies by myself and then calling the tasks in the correct order with the wait_for-arg (so that all upstream tasks are already added before).
    Anna Geller

    Anna Geller

    7 months ago
    You can always read those tasks in an Orion task and use it to dynamically construct downstream tasks. But I agree that this is not a use case Orion is designed for. Orion is dynamic and doesn't require DAGs. Having tasks and dependencies defined in some database and reading those to construct a DAG seems quite tedious. Why do you have that? Is it from some legacy orchestration system? You could simply migrate those to an Orion Workflow i.e. read it once manually and build a Python script out of it. Sorry if I'm not too helpful here
    Michael Hadorn

    Michael Hadorn

    7 months ago
    @Anna Geller Yes sounds stupid, but we have some complex ETL which entities to load (which have a type e.g. fact, dimension, dimension with historicizationetc.). This types are identical to load and that why we created task-collections. But anyway, I guess best would be to add this tasks in the correct order and user the wait_for. Really much thanks to your answers and your time.
    Anna Geller

    Anna Geller

    7 months ago
    No, not stupid, but it may simply be a matter of finding the right abstraction. Have you thought of writing some class or function that could help generate a task based on some config-based input? Perhaps some custom abstraction could read a config and generate a task from it
    Michael Hadorn

    Michael Hadorn

    7 months ago
    Yes, that's what our task-collections classes are already doing. It's really only the problem about adding/call them in the correct order. If I change this, then the most should run in Orion. But I've to test.