Hey all, i´ve got a problem with Prefect Local Age...
# ask-community
t
Hey all, i´ve got a problem with Prefect Local Agent, if I run my flow it calls:
Copy code
Unexpected error: TypeError("write() got multiple values for argument 'self'")
Traceback (most recent call last):
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\task_runner.py", line 926, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
TypeError: write() got multiple values for argument 'self'
It fails by executing this function:
Copy code
def load_data_task(self):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(os.getcwd())
        ... do something ...
        return self.load_data()
Regarding to this issue (https://github.com/PrefectHQ/prefect/issues/3034), it fails because
write()
function also got "self" input. But i cant delete "self" input in function
load_data_task
because it has to load function
load_data
. If i run the flow on local machine without server and agent, it works.
k
Hey @Tom Tom, that issue is was fixed though so you shouldn’t encounter this. Will test this out. Is
load_data_task
attached to a class?
t
Yes, it´s a bit tricky.
load_data
is function of BaseClass in script A.
load_data
will be inherited and optimized by another script B.
load_data_task
is also function of BaseClass in script A. inherited function
load_data
will be called by
load_data_task
Thats all because i cannot
import task
to script B due to restrictions.
k
did you use the task decorator above
load_data_task
? or you called this method inside some other task? This BaseClass is not a Task subclass right?
t
yeah i use task decorator above
load_data_task
like below:
Copy code
@task(name="load_data",nout=2)
  def load_data_task(self):
        logger =    prefect.context.get("logger")
        ...
        return self.load_data()
nout=2 because
load_data
returns
return train_ds, val_ds
Yes, BaseClass has no Task Decorator.
k
Copy code
from prefect import task, Flow
import prefect

class Base:
    
    def load_data(self):
        return 1, 2

    @task(name="load_data",nout=2)
    def load_data_task(self):
        prefect.context.get("test")
        return self.load_data()

with Flow("...") as flow:
    x = Base().load_data_task()

flow.run()
does this replicate your issue?
t
yeah exactly 👍
k
I think you need to reformat it to this:
Copy code
from prefect import task, Flow
import prefect

class Base:
    
    def load_data(self):
        return 1, 2

    def load_data_task(self):
        prefect.context.get("test")
        return self.load_data()

@task(name="load_data",nout=2)
def new_task():
    return Base().load_data_task()

with Flow("...") as flow:
    x = new_task()

flow.run()
upvote 1
Because the
@task
decorator returns a class and you can’t wrap the method as a class.
t
Copy code
Test = Base()
with Flow("...") as flow:
    x = Test.load_data_task(Test)
This also works with VSC, but not with agent/sever
k
I think this error is coming from the Class in a class when you decorate the method. This one you just posted still doesn’t avoid it because I think you still decorated the
load_data_task
itself?
t
yes i tested it like this:
Copy code
from prefect import task, Flow
import prefect

class Base:
    
    def load_data(self):
        return 1, 2

    @task(name="load_data",nout=2)
    def load_data_task(self):
        prefect.context.get("test")
        return self.load_data()

Test = Base()
with Flow("...") as flow:
    x = Test.load_data_task(Test)

flow.run()
k
Yes I am saying I think the task decorator on the method will cause the issues. So you can just bring it out:
Copy code
# no decorator here
    def load_data_task(self):
        prefect.context.get("test")
        return self.load_data()

@task(name="load_data",nout=2)
def new_task():
    return Base().load_data_task()
upvote 2
t
Ok thank you very much :)@Kevin Kho
👍 1
Hey @Kevin Kho sorry for annoying you, but if i try it with my code it fails with following issue:
Copy code
Unexpected error: InvalidArgumentError()
Traceback (most recent call last):
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\task_runner.py", line 926, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\results\local_result.py", line 115, in write
    value = self.serializer.serialize(new.value)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\serializers.py", line 73, in serialize
    return cloudpickle.dumps(value)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\tensorflow\python\framework\ops.py", line 1074, in __reduce__
    return convert_to_tensor, (self._numpy(),)
  File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\tensorflow\python\framework\ops.py", line 1117, in _numpy
    raise core._status_to_exception(e) from None  # pylint: disable=protected-access
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot convert a Tensor of dtype variant to a NumPy array.
Again with VSC it works, but not with agent/server. I used following code by declaration the task outside of classes:
Copy code
class Base():
    def __init__(self):
        pass
    def load_data_task(self):
        #Training
        train_ds = tf.keras.preprocessing.image_dataset_from_directory("../../data/raw/pictures
        /petimages",
        validation_split=0.2,
        subset="training",
        seed=1,
        image_size=(120,120),
        batch_size=16,)

        #Validation
        val_ds =  
tf.keras.preprocessing.image_dataset_from_directory("../../data/raw/pictures           
        /petimages",
        validation_split=0.2,
        subset="validation",
        seed=1,
        image_size=(120,120),
        batch_size=16,)

        return train_ds, val_ds

@task(name="loaddata",nout=2)
def new_task():
    return Base().load_data_task()

with Flow("Image Flow") as flow:
    x,y = new_task()

flow.register(project_name="test")
flow.run()
k
No worries. This is because Prefect checkpoints results and Tensorflow Objects are not serializeable (with normal Python stuff). You might be able to avoid this by turning off checkpointing.
Copy code
@task(name="loaddata",nout=2, checkpoint=False)
def new_task():
    return Base().load_data_task()
The checkpoints are there to restart the Flow from the point of failure. It would just get loaded from the checkpoint
🎉 1
t
It works ! Ur awesome, thank you very much ! and I tried and tried and tried... 😄
👍 1