Thread
#prefect-community
    Edison A

    Edison A

    1 year ago
    I hope I'm posting in the right place. I need help on how to call a class method as a task. Check 👇 the thread for the Code and StackTrace.
    Michael Adkins

    Michael Adkins

    1 year ago
    Hi @Edison A! Would you mind moving the code into this thread so the channel isn’t as full?
    Edison A

    Edison A

    1 year ago
    class DatabaseWriter:
        def __init__(self):
            Session = sessionmaker(bind=self.sqlalchemy_connection, autocommit=True)
            self.session = Session()
    
        @task
        def write_to_db(list_data):
            self.session.insert_into_db(list_data)
    When calling the task in flow:
    with Flow("scraping", schedule=schedule) as flow:
        list_data = [{...}, {...}]
        db_writer = DatabaseWriter()
        db_writer.write_to_db(list_data)
    Error:
    <Task: get_epexspot_cookies>
    Traceback (most recent call last):
      File "prefect_run.py", line 65, in <module>
        main()
      File "prefect_run.py", line 54, in main
        db_writer.write_to_db(list_data)
      File "venv/lib/python3.8/site-packages/prefect/core/task.py", line 513, in __call__
        new.bind(
      File "venv/lib/python3.8/site-packages/prefect/core/task.py", line 554, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "/usr/lib/python3.8/inspect.py", line 3025, in bind
        return self._bind(args, kwargs)
      File "/usr/lib/python3.8/inspect.py", line 2940, in _bind
        raise TypeError(msg) from None
    TypeError: missing a required argument: 'list_data'
    Michael Adkins

    Michael Adkins

    1 year ago
    Thanks! So the issue here is that class methods are expected to take
    self
    as their first variable so you’d actually need
    write_to_db(self, list_data)
    but because we are not actually executing
    write_to_db
    when you call it within the
    with Flow…
    block just preparing it for future calls your
    DatabaseWriter()
    instance will actually not exist / not have a valid session. (the way that class methods are actually bound makes this a little more complicated than I’ve said and leads to a confusing error like the one above)
    Basically the model that you’re going for here is not the pattern we’d recommend, let me try to get a best practice for what you’re doing.
    Our current best practice is to create a new connection within each task that needs it because passing the connection object around is complicated. Generally, our executors require that objects can be passed between threads and processes since they need to handle distributed and multiprocessing cases. Using an SQLAlchemy connection with multiprocessing is covered in theirs docs and may be helpful background on the complexity of the issue. There is an open PR in prefect to add better support for multiprocess client creation as well.
    Edison A

    Edison A

    1 year ago
    Thanks a bunch @Michael Adkins 👏👏👏.
    For the future, Is there a way we can have Prefect treat class methods differently when doing its
    preparations
    ? So that it doesn't expect them to have more args
    Michael Adkins

    Michael Adkins

    1 year ago
    Perhaps something like
    In [29]: class Test:
        ...:     @staticmethod
        ...:     @task
        ...:     def foo(variable):
        ...:         print(variable)
        ...: 
        ...: 
    
    In [30]: with Flow("ex") as flow:
        ...:     Test.foo("hello")
        ...: 
        ...: 
        ...: 
    
    In [31]: flow.run()
    [2020-10-29 21:46:55] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
    [2020-10-29 21:46:55] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
    hello
    [2020-10-29 21:46:55] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
    [2020-10-29 21:46:55] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Out[31]: <Success: "All reference tasks succeeded.">
    You can’t provide
    self
    to a method because it’s not a task or a resolvable constant
    You could also use a
    classmethod
    if you want to attach class type level data to your task but it’s important to distinguish that from a class instance which will not work
    In [32]: class Test:
        ...:     cls_variable = "foo"
        ...: 
        ...:     @classmethod
        ...:     @task
        ...:     def foo(cls, method_variable):
        ...:         print(f"{cls.cls_variable} {method_variable}")
        ...: 
        ...: 
        ...: 
    
    In [33]: with Flow("ex") as flow:
        ...:     Test.foo("hello")
        ...: 
        ...: 
        ...: 
    
    In [34]: flow.run()
    [2020-10-29 21:52:25] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
    [2020-10-29 21:52:25] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
    foo hello
    [2020-10-29 21:52:25] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
    [2020-10-29 21:52:25] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Out[34]: <Success: "All reference tasks succeeded.">
    @Edison A just want to check in, did these examples help?
    Edison A

    Edison A

    1 year ago
    Yes please, They helped; solved the bug! I rewrote the database handler to reflect the changes in a straight forward way.
    Yes please, They helped; solved the bug! I rewrote the database handler to reflect the changes in a straight forward way.
    Michael Adkins

    Michael Adkins

    1 year ago
    @Marvin archive “How do I use tasks within classes and share database sessions?”