William Jevne
01/14/2022, 5:24 PMWilliam Jevne
01/14/2022, 5:28 PMaccumulator
value is accessible to the top level flow, however, when attempting to pass the context to the sub-flow, it includes all of the additional Prefect values, some of which are not serializable. Obviously in this case I could pull out the specific variable I need by its key and pass it along, but in the project I am working on, this would quickly become burdensome. Besides wrapping my values in another dictionary or using a different naming scheme and filtering, is there any other way to make the context values accessible to the sub-flows?
import prefect
from prefect import Flow, Parameter, task, unmapped
from prefect.engine.results import PrefectResult
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, get_task_run_result
@task
def plus_one(x: int) -> int:
return x + prefect.context["accumulator"]
@task
def make_xs(xs: list[int]) -> dict[str, list[int]]:
return {"xs": xs}
@task
def get_context():
return prefect.context.to_dict()
@task
def add_up(xs: list[int]) -> int:
return sum([*xs, prefect.context["accumulator"]])
if __name__ == "__main__":
executor = LocalDaskExecutor()
result = PrefectResult()
with Flow("subflow", executor=executor, result=result) as subflow:
xs = Parameter("xs")
po = plus_one.map(xs)
subflow.register("scratch")
with Flow("flow", executor=executor, result=result) as flow:
inputs = Parameter("inputs", required=True)
xss = make_xs.map(inputs)
context = get_context()
ids = create_flow_run.map(
flow_name=unmapped("subflow"),
parameters=xss,
context=unmapped(context)
)
results = get_task_run_result.map(ids, unmapped("plus_one-1"))
sums = add_up.map(results)
flow.register("scratch")
with prefect.context(accumulator=1):
flow.run(parameters={"inputs": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]})
Kevin Kho