https://prefect.io logo
e

Edison A

10/29/2020, 9:13 PM
I hope I'm posting in the right place. I need help on how to call a class method as a task. Check 👇 the thread for the Code and StackTrace.
z

Zanie

10/29/2020, 9:19 PM
Hi @Edison A! Would you mind moving the code into this thread so the channel isn’t as full?
e

Edison A

10/29/2020, 9:21 PM
Copy code
class DatabaseWriter:
    def __init__(self):
        Session = sessionmaker(bind=self.sqlalchemy_connection, autocommit=True)
        self.session = Session()

    @task
    def write_to_db(list_data):
        self.session.insert_into_db(list_data)
When calling the task in flow:
Copy code
with Flow("scraping", schedule=schedule) as flow:
    list_data = [{...}, {...}]
    db_writer = DatabaseWriter()
    db_writer.write_to_db(list_data)
Error:
Copy code
<Task: get_epexspot_cookies>
Traceback (most recent call last):
  File "prefect_run.py", line 65, in <module>
    main()
  File "prefect_run.py", line 54, in main
    db_writer.write_to_db(list_data)
  File "venv/lib/python3.8/site-packages/prefect/core/task.py", line 513, in __call__
    new.bind(
  File "venv/lib/python3.8/site-packages/prefect/core/task.py", line 554, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/usr/lib/python3.8/inspect.py", line 3025, in bind
    return self._bind(args, kwargs)
  File "/usr/lib/python3.8/inspect.py", line 2940, in _bind
    raise TypeError(msg) from None
TypeError: missing a required argument: 'list_data'
z

Zanie

10/29/2020, 9:25 PM
Thanks! So the issue here is that class methods are expected to take
self
as their first variable so you’d actually need
write_to_db(self, list_data)
but because we are not actually executing
write_to_db
when you call it within the
with Flow…
block just preparing it for future calls your
DatabaseWriter()
instance will actually not exist / not have a valid session. (the way that class methods are actually bound makes this a little more complicated than I’ve said and leads to a confusing error like the one above)
Basically the model that you’re going for here is not the pattern we’d recommend, let me try to get a best practice for what you’re doing.
👍 1
Our current best practice is to create a new connection within each task that needs it because passing the connection object around is complicated. Generally, our executors require that objects can be passed between threads and processes since they need to handle distributed and multiprocessing cases. Using an SQLAlchemy connection with multiprocessing is covered in theirs docs and may be helpful background on the complexity of the issue. There is an open PR in prefect to add better support for multiprocess client creation as well.
e

Edison A

10/29/2020, 9:46 PM
Thanks a bunch @Zanie 👏👏👏.
For the future, Is there a way we can have Prefect treat class methods differently when doing its
preparations
? So that it doesn't expect them to have more args
z

Zanie

10/29/2020, 9:49 PM
Perhaps something like
Copy code
In [29]: class Test:
    ...:     @staticmethod
    ...:     @task
    ...:     def foo(variable):
    ...:         print(variable)
    ...: 
    ...: 

In [30]: with Flow("ex") as flow:
    ...:     Test.foo("hello")
    ...: 
    ...: 
    ...: 

In [31]: flow.run()
[2020-10-29 21:46:55] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2020-10-29 21:46:55] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
hello
[2020-10-29 21:46:55] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
[2020-10-29 21:46:55] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[31]: <Success: "All reference tasks succeeded.">
You can’t provide
self
to a method because it’s not a task or a resolvable constant
You could also use a
classmethod
if you want to attach class type level data to your task but it’s important to distinguish that from a class instance which will not work
Copy code
In [32]: class Test:
    ...:     cls_variable = "foo"
    ...: 
    ...:     @classmethod
    ...:     @task
    ...:     def foo(cls, method_variable):
    ...:         print(f"{cls.cls_variable} {method_variable}")
    ...: 
    ...: 
    ...: 

In [33]: with Flow("ex") as flow:
    ...:     Test.foo("hello")
    ...: 
    ...: 
    ...: 

In [34]: flow.run()
[2020-10-29 21:52:25] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2020-10-29 21:52:25] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
foo hello
[2020-10-29 21:52:25] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
[2020-10-29 21:52:25] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[34]: <Success: "All reference tasks succeeded.">
@Edison A just want to check in, did these examples help?
e

Edison A

10/30/2020, 4:46 PM
Yes please, They helped; solved the bug! I rewrote the database handler to reflect the changes in a straight forward way.
Yes please, They helped; solved the bug! I rewrote the database handler to reflect the changes in a straight forward way.
z

Zanie

10/30/2020, 4:50 PM
@Marvin archive “How do I use tasks within classes and share database sessions?”
👍 1
5 Views