https://prefect.io logo
m

Marwan Sarieddine

01/20/2021, 1:48 PM
Hi folks, it seems I am facing a bug when trying to pass parameters returned from a task to
StartFlowRun
- I just want to make sure I am doing this correctly - please see the example in the thread.
Here is the
starter-flow
Copy code
@task
def get():
    return 2


with Flow("starter-flow") as flow:
    x = get()
    StartFlowRun(
        flow_name="simple-flow",
        project_name="demo-flows",
        parameters={"x": x},
    )
Here is the
simple-flow
that should get started with a parameter x
Copy code
@task
def add(x):
    return x


x = Parameter(name="x")
with Flow("simple-flow") as flow:
    add(x)
Here is the error I get when I run
starter-flow
Copy code
Unexpected error: TypeError('Object of type FunctionTask is not JSON serializable')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 450, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 165, in run
    flow_run_id = client.create_flow_run(
  File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1094, in create_flow_run
    res = self.graphql(create_mutation, variables=dict(input=inputs))
  File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 302, in graphql
    params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
  File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type FunctionTask is not JSON serializable
s

Spencer

01/20/2021, 1:58 PM
You have to pass the parameters into the
run
of the Task, not the constructor. Something like this:
Copy code
@task
def get():
    return 2

start_flow = StartFlowRun(project_name="demo-flows")

with Flow("starter-flow") as flow:
    x = get()
    start_flow(
        flow_name="simple-flow",
        parameters={"x": x},
    )
m

Marwan Sarieddine

01/20/2021, 2:03 PM
@Spencer thank you - will give that a try
Thanks it works now and makes sense to me given dynamically evaluated values should be passed to
task.run
- Note I just passed
parameters
to
start_flow.__call__
(I kept
flow_name
and
project_name
at initialization given they are hardcoded constants ...)
j

Jim Crist-Harif

01/20/2021, 5:15 PM
Thanks for raising this, we can definitely provide a better error here. Will push a PR up today to fix this.
👍 1
2 Views