Tom Tom
11/17/2021, 7:15 PMUnexpected error: TypeError("write() got multiple values for argument 'self'")
Traceback (most recent call last):
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\task_runner.py", line 926, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
TypeError: write() got multiple values for argument 'self'
It fails by executing this function:
def load_data_task(self):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(os.getcwd())
... do something ...
return self.load_data()
Regarding to this issue (https://github.com/PrefectHQ/prefect/issues/3034), it fails because write()
function also got "self" input. But i cant delete "self" input in function load_data_task
because it has to load function load_data
.
If i run the flow on local machine without server and agent, it works.Kevin Kho
load_data_task
attached to a class?Tom Tom
11/17/2021, 7:26 PMload_data
is function of BaseClass in script A.
load_data
will be inherited and optimized by another script B.
load_data_task
is also function of BaseClass in script A.
inherited function load_data
will be called by load_data_task
Thats all because i cannot import task
to script B due to restrictions.Kevin Kho
load_data_task
? or you called this method inside some other task? This BaseClass is not a Task subclass right?Tom Tom
11/17/2021, 7:32 PMload_data_task
like below:
@task(name="load_data",nout=2)
def load_data_task(self):
logger = prefect.context.get("logger")
...
return self.load_data()
nout=2 because load_data
returns return train_ds, val_ds
Yes, BaseClass has no Task Decorator.Kevin Kho
from prefect import task, Flow
import prefect
class Base:
def load_data(self):
return 1, 2
@task(name="load_data",nout=2)
def load_data_task(self):
prefect.context.get("test")
return self.load_data()
with Flow("...") as flow:
x = Base().load_data_task()
flow.run()
Kevin Kho
Tom Tom
11/17/2021, 7:36 PMKevin Kho
from prefect import task, Flow
import prefect
class Base:
def load_data(self):
return 1, 2
def load_data_task(self):
prefect.context.get("test")
return self.load_data()
@task(name="load_data",nout=2)
def new_task():
return Base().load_data_task()
with Flow("...") as flow:
x = new_task()
flow.run()
Kevin Kho
@task
decorator returns a class and you can’t wrap the method as a class.Tom Tom
11/17/2021, 7:41 PMTest = Base()
with Flow("...") as flow:
x = Test.load_data_task(Test)
This also works with VSC, but not with agent/severKevin Kho
load_data_task
itself?Tom Tom
11/17/2021, 7:51 PMfrom prefect import task, Flow
import prefect
class Base:
def load_data(self):
return 1, 2
@task(name="load_data",nout=2)
def load_data_task(self):
prefect.context.get("test")
return self.load_data()
Test = Base()
with Flow("...") as flow:
x = Test.load_data_task(Test)
flow.run()
Kevin Kho
# no decorator here
def load_data_task(self):
prefect.context.get("test")
return self.load_data()
@task(name="load_data",nout=2)
def new_task():
return Base().load_data_task()
Tom Tom
11/17/2021, 8:01 PMTom Tom
11/17/2021, 11:51 PMUnexpected error: InvalidArgumentError()
Traceback (most recent call last):
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\task_runner.py", line 926, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\results\local_result.py", line 115, in write
value = self.serializer.serialize(new.value)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\tensorflow\python\framework\ops.py", line 1074, in __reduce__
return convert_to_tensor, (self._numpy(),)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\tensorflow\python\framework\ops.py", line 1117, in _numpy
raise core._status_to_exception(e) from None # pylint: disable=protected-access
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot convert a Tensor of dtype variant to a NumPy array.
Again with VSC it works, but not with agent/server.
I used following code by declaration the task outside of classes:
class Base():
def __init__(self):
pass
def load_data_task(self):
#Training
train_ds = tf.keras.preprocessing.image_dataset_from_directory("../../data/raw/pictures
/petimages",
validation_split=0.2,
subset="training",
seed=1,
image_size=(120,120),
batch_size=16,)
#Validation
val_ds =
tf.keras.preprocessing.image_dataset_from_directory("../../data/raw/pictures
/petimages",
validation_split=0.2,
subset="validation",
seed=1,
image_size=(120,120),
batch_size=16,)
return train_ds, val_ds
@task(name="loaddata",nout=2)
def new_task():
return Base().load_data_task()
with Flow("Image Flow") as flow:
x,y = new_task()
flow.register(project_name="test")
flow.run()
Kevin Kho
@task(name="loaddata",nout=2, checkpoint=False)
def new_task():
return Base().load_data_task()
Kevin Kho
Tom Tom
11/17/2021, 11:58 PM