m

    Marwan Sarieddine

    1 year ago
    Hi folks, it seems I am facing a bug when trying to pass parameters returned from a task to
    StartFlowRun
    - I just want to make sure I am doing this correctly - please see the example in the thread.
    Here is the
    starter-flow
    @task
    def get():
        return 2
    
    
    with Flow("starter-flow") as flow:
        x = get()
        StartFlowRun(
            flow_name="simple-flow",
            project_name="demo-flows",
            parameters={"x": x},
        )
    Here is the
    simple-flow
    that should get started with a parameter x
    @task
    def add(x):
        return x
    
    
    x = Parameter(name="x")
    with Flow("simple-flow") as flow:
        add(x)
    Here is the error I get when I run
    starter-flow
    Unexpected error: TypeError('Object of type FunctionTask is not JSON serializable')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 450, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 165, in run
        flow_run_id = client.create_flow_run(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1094, in create_flow_run
        res = self.graphql(create_mutation, variables=dict(input=inputs))
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 302, in graphql
        params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
      File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
        return _default_encoder.encode(obj)
      File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
        raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type FunctionTask is not JSON serializable
    s

    Spencer

    1 year ago
    You have to pass the parameters into the
    run
    of the Task, not the constructor. Something like this:
    @task
    def get():
        return 2
    
    start_flow = StartFlowRun(project_name="demo-flows")
    
    with Flow("starter-flow") as flow:
        x = get()
        start_flow(
            flow_name="simple-flow",
            parameters={"x": x},
        )
    m

    Marwan Sarieddine

    1 year ago
    @Spencer thank you - will give that a try
    Thanks it works now and makes sense to me given dynamically evaluated values should be passed to
    task.run
    - Note I just passed
    parameters
    to
    start_flow.__call__
    (I kept
    flow_name
    and
    project_name
    at initialization given they are hardcoded constants ...)
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    Thanks for raising this, we can definitely provide a better error here. Will push a PR up today to fix this.