Marwan Sarieddine
01/20/2021, 1:48 PMStartFlowRun
- I just want to make sure I am doing this correctly - please see the example in the thread.starter-flow
@task
def get():
return 2
with Flow("starter-flow") as flow:
x = get()
StartFlowRun(
flow_name="simple-flow",
project_name="demo-flows",
parameters={"x": x},
)
Here is the simple-flow
that should get started with a parameter x
@task
def add(x):
return x
x = Parameter(name="x")
with Flow("simple-flow") as flow:
add(x)
starter-flow
Unexpected error: TypeError('Object of type FunctionTask is not JSON serializable')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 450, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 165, in run
flow_run_id = client.create_flow_run(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1094, in create_flow_run
res = self.graphql(create_mutation, variables=dict(input=inputs))
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 302, in graphql
params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type FunctionTask is not JSON serializable
Spencer
01/20/2021, 1:58 PMrun
of the Task, not the constructor. Something like this:
@task
def get():
return 2
start_flow = StartFlowRun(project_name="demo-flows")
with Flow("starter-flow") as flow:
x = get()
start_flow(
flow_name="simple-flow",
parameters={"x": x},
)
Marwan Sarieddine
01/20/2021, 2:03 PMtask.run
-
Note I just passed parameters
to start_flow.__call__
(I kept flow_name
and project_name
at initialization given they are hardcoded constants ...)Jim Crist-Harif
01/20/2021, 5:15 PM