Thread
#prefect-community
    William Jevne

    William Jevne

    8 months ago
    Hi all, I am working on a flow of flows and am wondering if there is an easy way to pass user provided context to the sub-flows.
    Below is an example of what I am referring to. The
    accumulator
    value is accessible to the top level flow, however, when attempting to pass the context to the sub-flow, it includes all of the additional Prefect values, some of which are not serializable. Obviously in this case I could pull out the specific variable I need by its key and pass it along, but in the project I am working on, this would quickly become burdensome. Besides wrapping my values in another dictionary or using a different naming scheme and filtering, is there any other way to make the context values accessible to the sub-flows?
    import prefect
    from prefect import Flow, Parameter, task, unmapped
    from prefect.engine.results import PrefectResult
    from prefect.executors import LocalDaskExecutor
    from prefect.tasks.prefect import create_flow_run, get_task_run_result
    
    
    @task
    def plus_one(x: int) -> int:
        return x + prefect.context["accumulator"]
    
    
    @task
    def make_xs(xs: list[int]) -> dict[str, list[int]]:
        return {"xs": xs}
    
    
    @task
    def get_context():
        return prefect.context.to_dict()
    
    
    @task
    def add_up(xs: list[int]) -> int:
        return sum([*xs, prefect.context["accumulator"]])
    
    
    if __name__ == "__main__":
        executor = LocalDaskExecutor()
        result = PrefectResult()
        with Flow("subflow", executor=executor, result=result) as subflow:
            xs = Parameter("xs")
            po = plus_one.map(xs)
    
        subflow.register("scratch")
    
        with Flow("flow", executor=executor, result=result) as flow:
            inputs = Parameter("inputs", required=True)
            xss = make_xs.map(inputs)
            context = get_context()
            ids = create_flow_run.map(
                flow_name=unmapped("subflow"),
                parameters=xss,
                context=unmapped(context)
            )
            results = get_task_run_result.map(ids, unmapped("plus_one-1"))
            sums = add_up.map(results)
    
        flow.register("scratch")
    
        with prefect.context(accumulator=1):
            flow.run(parameters={"inputs": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]})
    Kevin Kho

    Kevin Kho

    8 months ago
    Hey @William Jevne, I think there is no other way here, except considering using Parameters. If you have a lot of stuff you are adding to the context, you can give a path to a file and maybe load it in that way