Is there a way to pass on parameters/task outputs ...
# prefect-community
s
Is there a way to pass on parameters/task outputs from current task to another task when using FlowRunTask? I get below error when trying to do the same:
TypeError: Object of type 'Parameter' is not JSON serializable
Below is a sample snippet:
Copy code
with Flow("sample flow") as sample_flow:
    param = Parameter("task_param")

    FlowRunTask(flow_name="next flow", parameters={"task_param": param})()
👀 1
n
Hi @Sandeep Aggarwal! I believe to make this work, you'll want to call
param.serialize()
on that parameter when you pass it to the
FlowRunTask
s
@nicholas Thanks for quick response. The parent flow works smoothly when
serialize
is applied to parameter/task outputs, However, in downstream flow, serialised version of parameter/task is received instead of actual value. Below is the sample value received for
context
parameter in downstream flow.
Copy code
{
  "context": {
    "name": "context",
    "slug": "context",
    "tags": [],
    "type": "prefect.core.task.Parameter",
    "default": null,
    "outputs": "typing.Any",
    "required": true,
    "__version__": "0.11.1"
  }
}
n
Ah right you are, one sec...
Ok sorry about that! What you can do instead is pass the parameters arg to the run call of
FlowRunTask
like this:
Copy code
with Flow("Parameter Getting") as flow:
    param = Parameter("task_param")

    flowRunTask = FlowRunTask(
        flow_name="next flow", project_name="Your Project",
    )

    flowRunTask(parameters={"task_param": param})
s
ok let me give it a try
Is this supposed to work with
map
as well? Sorry for not explaining my exact usecase earlier. Below snippet demonstrates the exact usecase:
Copy code
with Flow("Parameter Getting") as flow:
    param = Parameter("task_param")
    task1 = some_task()
    flow_names = another_task()

    FlowRunTask().map(flow_names, task_args={"parameters":{"task_param": param, "task1": task1}})
I tried applying your suggestion to map, but it throws same serialisation error at me. Also, there is output from another task that needs to be sent as parameter to next flow.
n
Hm let me take a look at that, I've never tried using flow run task with map
Ok so I couldn't get the flow run task to work out of the box with mapping, but subclassing it did the trick, you could try something like this:
Copy code
import prefect
from prefect import Flow, task, Parameter
from prefect.tasks.prefect.flow_run import FlowRunTask

class MappedFlowRunTask(FlowRunTask):
    def run(self, ctx):
        self.flow_name = ctx["flow_name"]
        self.parameters = ctx["parameters"]
        super(MappedFlowRunTask, self).run()

@task
def get_flow_names(x):
    return [{"flow_name": "Parameter Printing", "parameters": {"task_param": x}}]

with Flow("Parameter Getting") as flow:
    param = Parameter("task_param", default="15")

    flow_names = get_flow_names(x=param)

    flowRunTask = MappedFlowRunTask(project_name="My Project")

    flowRunTask.map(flow_names)
s
This worked flawlessly. Thanks a lot @nicholas 👍
n
Just took another look at this @Sandeep Aggarwal and you can use the
FlowRunTask
out of the box, it looks like we weren't providing lists to it as we should have been. This should work:
Copy code
@task
def get_flow_names():
    return ["Another Flow", "And Another Flow"]

with Flow("Parameter Getting") as flow_1:
    param = Parameter("task_param", default="15")

    flow_names = get_flow_names()

    flowRunTask = FlowRunTask(project_name="Community Support Flows")

    flowRunTask.map(flow_name=flow_names, parameters=[dict(some_param_2=param)])
(and should simplify the code a bit)
s
Thanks again @nicholas. This works exactly as I wanted it to. 🙂