https://prefect.io logo
v

Vitaly Shulgin

01/21/2021, 3:07 PM
Hello Team, I sub-classed PostrgresTask, to provide db connection details, to provide values from configuration, and got problem with argument
fetch
when prefect calls to run method
👀 1
1
There is no problem when flow is running locally, but when it runs in k8s jobs, it fails
Copy code
Unexpected error: TypeError("run() got multiple values for argument 'fetch'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/root/.prefect/runner/flows/lazarus/extract_jobs.py", line 63, in run
    result = super(JobsExtract, self).run(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method
    return run_method(self, *args, **kwargs)
TypeError: run() got multiple values for argument 'fetch'
Sub-classe is pretty straightforward
Copy code
class JobsExtract(PostgresFetch):

    def __init__(
        self,
        play_type: str,
        is_enabled: bool = True,
        db_name: str = None,
        user: str = None,
        host: str = None,
        port: int = None,
        query: str = None
    ):

        self.play_type  = play_type
        self.query      = query or f"SELECT play_id, target_page FROM {self.play_type} WHERE is_enabled = { 'TRUE' if is_enabled else 'FALSE' }"
        logger.debug("Query for plays: <%s>", self.query)

        self.db_name = db_name or settings.DATABASE.name
        self.user    = user or settings.DATABASE.user
        self.host    = host or settings.DATABASE.host
        self.port    = port or settings.DATABASE.port
        self.password= settings.DATABASE.pswd
        self.fetch   = "all"

        super(JobsExtract, self).__init__(
            self.db_name,
            self.user,
            self.host,
            port=self.port,
            fetch=self.fetch,
            query=self.query
        )

    def run(self):
        result = super(JobsExtract, self).run(
            password=self.password
        )
        logger.debug("Query returned <%s> results", len(result))
        return result
I guess, problem is realted to
@defaults_from_attrs
Copy code
@defaults_from_attrs("fetch", "fetch_count", "query", "data", "commit")
    def run(
        self,
        fetch: str = "one",
        fetch_count: int = 10,
        query: str = None,
        data: tuple = None,
        commit: bool = False,
        password: str = None,
    ):
any ideas? how to get it work?
z

Zanie

01/21/2021, 4:53 PM
Hi! I’m not sure I see how this could fail. Since it’s working locally it seems like it might be possible the newest code isn’t on K8s?
v

Vitaly Shulgin

01/21/2021, 5:47 PM
Well, I solved it by sub-classing Task class, and implemented functionality by myself