Edison A
10/29/2020, 9:13 PMZanie
Edison A
10/29/2020, 9:21 PMclass DatabaseWriter:
def __init__(self):
Session = sessionmaker(bind=self.sqlalchemy_connection, autocommit=True)
self.session = Session()
@task
def write_to_db(list_data):
self.session.insert_into_db(list_data)
When calling the task in flow:
with Flow("scraping", schedule=schedule) as flow:
list_data = [{...}, {...}]
db_writer = DatabaseWriter()
db_writer.write_to_db(list_data)
Error:
<Task: get_epexspot_cookies>
Traceback (most recent call last):
File "prefect_run.py", line 65, in <module>
main()
File "prefect_run.py", line 54, in main
db_writer.write_to_db(list_data)
File "venv/lib/python3.8/site-packages/prefect/core/task.py", line 513, in __call__
new.bind(
File "venv/lib/python3.8/site-packages/prefect/core/task.py", line 554, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/usr/lib/python3.8/inspect.py", line 3025, in bind
return self._bind(args, kwargs)
File "/usr/lib/python3.8/inspect.py", line 2940, in _bind
raise TypeError(msg) from None
TypeError: missing a required argument: 'list_data'
Zanie
self
as their first variable so you’d actually need write_to_db(self, list_data)
but because we are not actually executing write_to_db
when you call it within the with Flow…
block just preparing it for future calls your DatabaseWriter()
instance will actually not exist / not have a valid session. (the way that class methods are actually bound makes this a little more complicated than I’ve said and leads to a confusing error like the one above)Edison A
10/29/2020, 9:46 PMpreparations
? So that it doesn't expect them to have more argsZanie
In [29]: class Test:
...: @staticmethod
...: @task
...: def foo(variable):
...: print(variable)
...:
...:
In [30]: with Flow("ex") as flow:
...: Test.foo("hello")
...:
...:
...:
In [31]: flow.run()
[2020-10-29 21:46:55] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2020-10-29 21:46:55] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
hello
[2020-10-29 21:46:55] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
[2020-10-29 21:46:55] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[31]: <Success: "All reference tasks succeeded.">
self
to a method because it’s not a task or a resolvable constantclassmethod
if you want to attach class type level data to your task but it’s important to distinguish that from a class instance which will not work
In [32]: class Test:
...: cls_variable = "foo"
...:
...: @classmethod
...: @task
...: def foo(cls, method_variable):
...: print(f"{cls.cls_variable} {method_variable}")
...:
...:
...:
In [33]: with Flow("ex") as flow:
...: Test.foo("hello")
...:
...:
...:
In [34]: flow.run()
[2020-10-29 21:52:25] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2020-10-29 21:52:25] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
foo hello
[2020-10-29 21:52:25] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
[2020-10-29 21:52:25] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[34]: <Success: "All reference tasks succeeded.">
Edison A
10/30/2020, 4:46 PMZanie