Thread
#prefect-community
    m

    Martim Lobao

    7 months ago
    is there a way i can avoid the superfluous
    List
    and
    Dict
    tasks in my DAG? this is taken from a barely modified example on prefect’s blog (code in thread)
    from typing import List
    
    from prefect import Flow, Parameter, task
    from prefect.engine.results import LocalResult
    from prefect.tasks.prefect import create_flow_run, get_task_run_result
    from prefect.executors import LocalExecutor
    
    
    @task(result=LocalResult(), log_stdout=True)
    def create_some_data(length: int):
        print(length)
        return list(range(length))
    
    @task(result=LocalResult(), log_stdout=True)
    def minimum(data: List[int]) -> int:
        new = data[0] + 2
        print("new", new)
        return new
    
    with Flow("child", executor=LocalExecutor()) as child_flow:
        data_size = Parameter("data_size", default=5)
        data = create_some_data(data_size, task_args=dict(slug="foobar"))
        minimum(data, task_args=dict(slug="min"))
    
    @task(log_stdout=True)
    def transform_and_show(data: List[int]) -> List[int]:
        print(f"Got: {data!r}")
        new_data = [x + 1 for x in data]
        print(f"Created: {new_data!r}")
        return new_data
    
    with Flow("parent", executor=LocalExecutor()) as parent_flow:
        child_run_id = create_flow_run(
            flow_name=child_flow.name, parameters=dict(data_size=10)
        )
        child_min = get_task_run_result(child_run_id, "min")
        new_child_run_id = create_flow_run(
            flow_name=child_flow.name, parameters=dict(data_size=child_min)
        )
        child_data = get_task_run_result(new_child_run_id, "foobar")
    
        transform_and_show(child_data)
    Anna Geller

    Anna Geller

    7 months ago
    check out this page - you can basically create an intermediary task to avoid those extra tasks such as List or Dict
    m

    Martim Lobao

    7 months ago
    thanks @Anna Geller! i’m still a little confused why the
    List
    exists at all, but it looks like the following change cleans everything up 🙂
    @task
    def get_flow_parameters(data_size):
        return dict(data_size=data_size)
    
    with Flow("parent", executor=LocalExecutor()) as parent_flow:
        child_run_id = create_flow_run(
            flow_name=child_flow.name, parameters=get_flow_parameters(data_size=10)
        )
        child_min = get_task_run_result(child_run_id, "min")
        new_child_run_id = create_flow_run(
            flow_name=child_flow.name, parameters=get_flow_parameters(data_size=child_min)
        )
        child_data = get_task_run_result(new_child_run_id, "foobar")
    
        transform_and_show(child_data)
    it looks like this is because
    child_min
    isn’t an
    int
    as i was expecting but rather a
    <class 'prefect.tasks.core.function.FunctionTask'>
    instead
    Anna Geller

    Anna Geller

    7 months ago
    True, the result of the task, when you pass it to downstream tasks, will be int but until then this is a FunctionTask, you're correct.