https://prefect.io logo
a

An Ninh Vũ

11/29/2022, 2:07 AM
Hello everyone, I have a problem with running tasks with parameters. Can someone help me :>> Lately, I tried to run a flow consisting of parameters. The parsing parameters are of type Parameter (
from prefect import Parameter
). Let's say I have the parameter "`account_id`", with an int value type. This parameter is parsed when I run the flow with
account_id = Parameter("account_id", default=None)
. After that I have a task that is running with a timeout. And in that task's state handler, I reuse the parameter
account_id
and parse that parameter into a dictionary (code is below). Here is where the error comes up, as `account_id`'s type is Parameter, but it doesn't accept a parameter in a dictionary if you want to
json.dumps
that dict. My question is (also TL;DR): How can I turn this parameter's type into an int type (originally prefect.Parameter type) to use in
json.dumps
? P/s: Is there another way to do what I'm trying to achieve? Thank you so much! P/ss: My Prefect version: 0.14.22
Copy code
def timeout_state_handler(task, old_state, new_state):
    if isinstance(new_state, state.TimedOut):
        logger.error("rerun this flow")
        flow = StartFlowRun(flow_name='rerun_flow', project_name='test')
        run_config = DockerRun(
            image="mydocker/test-project:latest",
            env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}
        )
        flow.run(
            parameters=dict(account_id=account_id, dates=dates),
            run_config=run_config, run_name="RERUN_FLOW",
            idempotency_key=gen_idempotency_key(account_id)
        )
        raise signals.SUCCESS("This task timed out with status SUCCESS. Re-run this flow.")
the error:
Copy code
File "/mnt/c/Users/DELL/Downloads/test/venv/venv/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 454, in method
    return run_method(self, *args, **kwargs)
  File "/mnt/c/Users/DELL/Downloads/test/venv/venv/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 187, in run
    flow_run_id = client.create_flow_run(
  File "/mnt/c/Users/DELL/Downloads/test/venv/venv/lib/python3.9/site-packages/prefect/client/client.py", line 1108, in create_flow_run
    res = self.graphql(create_mutation, variables=dict(input=inputs))
  File "/mnt/c/Users/DELL/Downloads/test/venv/venv/lib/python3.9/site-packages/prefect/client/client.py", line 302, in graphql
    params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
  File "/usr/lib/python3.9/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/lib/python3.9/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Parameter is not JSON serializable
4 Views