Hi all, I am working on a flow of flows and am won...
# ask-community
w
Hi all, I am working on a flow of flows and am wondering if there is an easy way to pass user provided context to the sub-flows.
Below is an example of what I am referring to. The
accumulator
value is accessible to the top level flow, however, when attempting to pass the context to the sub-flow, it includes all of the additional Prefect values, some of which are not serializable. Obviously in this case I could pull out the specific variable I need by its key and pass it along, but in the project I am working on, this would quickly become burdensome. Besides wrapping my values in another dictionary or using a different naming scheme and filtering, is there any other way to make the context values accessible to the sub-flows?
Copy code
import prefect
from prefect import Flow, Parameter, task, unmapped
from prefect.engine.results import PrefectResult
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, get_task_run_result


@task
def plus_one(x: int) -> int:
    return x + prefect.context["accumulator"]


@task
def make_xs(xs: list[int]) -> dict[str, list[int]]:
    return {"xs": xs}


@task
def get_context():
    return prefect.context.to_dict()


@task
def add_up(xs: list[int]) -> int:
    return sum([*xs, prefect.context["accumulator"]])


if __name__ == "__main__":
    executor = LocalDaskExecutor()
    result = PrefectResult()
    with Flow("subflow", executor=executor, result=result) as subflow:
        xs = Parameter("xs")
        po = plus_one.map(xs)

    subflow.register("scratch")

    with Flow("flow", executor=executor, result=result) as flow:
        inputs = Parameter("inputs", required=True)
        xss = make_xs.map(inputs)
        context = get_context()
        ids = create_flow_run.map(
            flow_name=unmapped("subflow"),
            parameters=xss,
            context=unmapped(context)
        )
        results = get_task_run_result.map(ids, unmapped("plus_one-1"))
        sums = add_up.map(results)

    flow.register("scratch")

    with prefect.context(accumulator=1):
        flow.run(parameters={"inputs": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]})
k
Hey @William Jevne, I think there is no other way here, except considering using Parameters. If you have a lot of stuff you are adding to the context, you can give a path to a file and maybe load it in that way