Thread
#prefect-community
    s

    Sandeep Aggarwal

    2 years ago
    Is there a way to pass on parameters/task outputs from current task to another task when using FlowRunTask? I get below error when trying to do the same:
    TypeError: Object of type 'Parameter' is not JSON serializable
    Below is a sample snippet:
    with Flow("sample flow") as sample_flow:
        param = Parameter("task_param")
    
        FlowRunTask(flow_name="next flow", parameters={"task_param": param})()
    nicholas

    nicholas

    2 years ago
    Hi @Sandeep Aggarwal! I believe to make this work, you'll want to call
    param.serialize()
    on that parameter when you pass it to the
    FlowRunTask
    s

    Sandeep Aggarwal

    2 years ago
    @nicholas Thanks for quick response. The parent flow works smoothly when
    serialize
    is applied to parameter/task outputs, However, in downstream flow, serialised version of parameter/task is received instead of actual value. Below is the sample value received for
    context
    parameter in downstream flow.
    {
      "context": {
        "name": "context",
        "slug": "context",
        "tags": [],
        "type": "prefect.core.task.Parameter",
        "default": null,
        "outputs": "typing.Any",
        "required": true,
        "__version__": "0.11.1"
      }
    }
    nicholas

    nicholas

    2 years ago
    Ah right you are, one sec...
    Ok sorry about that! What you can do instead is pass the parameters arg to the run call of
    FlowRunTask
    like this:
    with Flow("Parameter Getting") as flow:
        param = Parameter("task_param")
    
        flowRunTask = FlowRunTask(
            flow_name="next flow", project_name="Your Project",
        )
    
        flowRunTask(parameters={"task_param": param})
    s

    Sandeep Aggarwal

    2 years ago
    ok let me give it a try
    Is this supposed to work with
    map
    as well? Sorry for not explaining my exact usecase earlier. Below snippet demonstrates the exact usecase:
    with Flow("Parameter Getting") as flow:
        param = Parameter("task_param")
        task1 = some_task()
        flow_names = another_task()
    
        FlowRunTask().map(flow_names, task_args={"parameters":{"task_param": param, "task1": task1}})
    I tried applying your suggestion to map, but it throws same serialisation error at me. Also, there is output from another task that needs to be sent as parameter to next flow.
    nicholas

    nicholas

    2 years ago
    Hm let me take a look at that, I've never tried using flow run task with map
    Ok so I couldn't get the flow run task to work out of the box with mapping, but subclassing it did the trick, you could try something like this:
    import prefect
    from prefect import Flow, task, Parameter
    from prefect.tasks.prefect.flow_run import FlowRunTask
    
    class MappedFlowRunTask(FlowRunTask):
        def run(self, ctx):
            self.flow_name = ctx["flow_name"]
            self.parameters = ctx["parameters"]
            super(MappedFlowRunTask, self).run()
    
    @task
    def get_flow_names(x):
        return [{"flow_name": "Parameter Printing", "parameters": {"task_param": x}}]
    
    with Flow("Parameter Getting") as flow:
        param = Parameter("task_param", default="15")
    
        flow_names = get_flow_names(x=param)
    
        flowRunTask = MappedFlowRunTask(project_name="My Project")
    
        flowRunTask.map(flow_names)
    s

    Sandeep Aggarwal

    2 years ago
    This worked flawlessly. Thanks a lot @nicholas 👍
    nicholas

    nicholas

    2 years ago
    Just took another look at this @Sandeep Aggarwal and you can use the
    FlowRunTask
    out of the box, it looks like we weren't providing lists to it as we should have been. This should work:
    @task
    def get_flow_names():
        return ["Another Flow", "And Another Flow"]
    
    with Flow("Parameter Getting") as flow_1:
        param = Parameter("task_param", default="15")
    
        flow_names = get_flow_names()
    
        flowRunTask = FlowRunTask(project_name="Community Support Flows")
    
        flowRunTask.map(flow_name=flow_names, parameters=[dict(some_param_2=param)])
    (and should simplify the code a bit)
    s

    Sandeep Aggarwal

    2 years ago
    Thanks again @nicholas. This works exactly as I wanted it to. 🙂