Sandeep Aggarwal
05/20/2020, 1:47 PMTypeError: Object of type 'Parameter' is not JSON serializable
Below is a sample snippet:
with Flow("sample flow") as sample_flow:
param = Parameter("task_param")
FlowRunTask(flow_name="next flow", parameters={"task_param": param})()
nicholas
05/20/2020, 2:00 PMparam.serialize()
on that parameter when you pass it to the FlowRunTask
Sandeep Aggarwal
05/20/2020, 2:42 PMserialize
is applied to parameter/task outputs, However, in downstream flow, serialised version of parameter/task is received instead of actual value. Below is the sample value received for context
parameter in downstream flow.
{
"context": {
"name": "context",
"slug": "context",
"tags": [],
"type": "prefect.core.task.Parameter",
"default": null,
"outputs": "typing.Any",
"required": true,
"__version__": "0.11.1"
}
}
nicholas
05/20/2020, 2:56 PMFlowRunTask
like this:
with Flow("Parameter Getting") as flow:
param = Parameter("task_param")
flowRunTask = FlowRunTask(
flow_name="next flow", project_name="Your Project",
)
flowRunTask(parameters={"task_param": param})
Sandeep Aggarwal
05/20/2020, 3:14 PMmap
as well? Sorry for not explaining my exact usecase earlier. Below snippet demonstrates the exact usecase:
with Flow("Parameter Getting") as flow:
param = Parameter("task_param")
task1 = some_task()
flow_names = another_task()
FlowRunTask().map(flow_names, task_args={"parameters":{"task_param": param, "task1": task1}})
I tried applying your suggestion to map, but it throws same serialisation error at me. Also, there is output from another task that needs to be sent as parameter to next flow.nicholas
05/20/2020, 3:44 PMimport prefect
from prefect import Flow, task, Parameter
from prefect.tasks.prefect.flow_run import FlowRunTask
class MappedFlowRunTask(FlowRunTask):
def run(self, ctx):
self.flow_name = ctx["flow_name"]
self.parameters = ctx["parameters"]
super(MappedFlowRunTask, self).run()
@task
def get_flow_names(x):
return [{"flow_name": "Parameter Printing", "parameters": {"task_param": x}}]
with Flow("Parameter Getting") as flow:
param = Parameter("task_param", default="15")
flow_names = get_flow_names(x=param)
flowRunTask = MappedFlowRunTask(project_name="My Project")
flowRunTask.map(flow_names)
Sandeep Aggarwal
05/20/2020, 4:30 PMnicholas
05/20/2020, 4:33 PMFlowRunTask
out of the box, it looks like we weren't providing lists to it as we should have been. This should work:
@task
def get_flow_names():
return ["Another Flow", "And Another Flow"]
with Flow("Parameter Getting") as flow_1:
param = Parameter("task_param", default="15")
flow_names = get_flow_names()
flowRunTask = FlowRunTask(project_name="Community Support Flows")
flowRunTask.map(flow_name=flow_names, parameters=[dict(some_param_2=param)])
Sandeep Aggarwal
05/21/2020, 10:36 AM