Hi! This morning i was refactoring some code and a...
# prefect-community
d
Hi! This morning i was refactoring some code and also upgraded to prefect2.0b5 - but when i tried to deploy and run my agent on a “Compute Engine” instance i ran into problems. (Dont think it’s related to prefect2.0b5 since the problem can be reproduced in b4 as well. When i run a flow from the compute engine i get an exception saying that the client cannot be pickled:
Copy code
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
After reading up on this it sounds like this is due to some other exception that results in an unpickleable object. Is that right? Are there any ways i can reach the underlying exception? Traceback and code in thread..
1
Traceback:
Copy code
12:21:52.421 | INFO    | prefect.engine - Created flow run 'rugged-capuchin' for flow 'test-flow'
12:21:52.421 | INFO    | Flow run 'rugged-capuchin' - Using task runner 'ConcurrentTaskRunner'
12:21:52.756 | ERROR   | Flow run 'rugged-capuchin' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 445, in orchestrate_flow_run
    with partial_flow_run_context.finalize(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 325, in finalize
    return self.model_cls(**self.fields, **kwargs)
  File "pydantic/main.py", line 339, in pydantic.main.BaseModel.__init__
  File "pydantic/main.py", line 1038, in pydantic.main.validate_model
  File "pydantic/fields.py", line 857, in pydantic.fields.ModelField.validate
  File "pydantic/fields.py", line 1074, in pydantic.fields.ModelField._validate_singleton
  File "pydantic/fields.py", line 1121, in pydantic.fields.ModelField._apply_validators
  File "pydantic/class_validators.py", line 313, in pydantic.class_validators._generic_validator_basic.lambda12
  File "pydantic/main.py", line 679, in pydantic.main.BaseModel.validate
  File "pydantic/main.py", line 605, in pydantic.main.BaseModel._copy_and_set_values
  File "/usr/local/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.10/copy.py", line 161, in deepcopy
    rv = reductor(4)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 194, in __getstate__
    raise PicklingError(
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
12:21:52.926 | ERROR   | Flow run 'rugged-capuchin' - Finished in state Failed('Flow run encountered an exception.')
This happens when i’m trying to run a simple test flow:
Copy code
from prefect import task, flow, get_run_logger

@flow
def test_flow():
    print("flow starting")
    res1 = running_task1()
    return running_task2(res1)

@task
def running_task1():
    logger =get_run_logger()
    <http://logger.info|logger.info>("Running task1")
    return "task 1"

@task
def running_task2(input):
    logger =get_run_logger()
    <http://logger.info|logger.info>(f"Running task2 with {input}")
    return "task 2"

if __name__=="__main__":
    test_flow()
a
1. do you use GCS storage? 2. does it work for you when you run this locally?
I couldn't reproduce the error when running it locally:
Copy code
14:49:47.823 | INFO    | prefect.engine - Created flow run 'vivacious-parrot' for flow 'ex-flow'
14:49:47.823 | INFO    | Flow run 'vivacious-parrot' - Using task runner 'ConcurrentTaskRunner'
flow starting
14:49:48.344 | INFO    | Flow run 'vivacious-parrot' - Created task run 'running_task1-c5e0b2aa-0' for task 'running_task1'
14:49:48.564 | INFO    | Flow run 'vivacious-parrot' - Created task run 'running_task2-8707b00f-0' for task 'running_task2'
14:49:48.645 | INFO    | Task run 'running_task1-c5e0b2aa-0' - Running task1
14:49:48.810 | INFO    | Task run 'running_task1-c5e0b2aa-0' - Finished in state Completed()
14:49:48.976 | INFO    | Task run 'running_task2-8707b00f-0' - Running task2 with task 1
14:49:49.141 | INFO    | Task run 'running_task2-8707b00f-0' - Finished in state Completed()
14:49:49.311 | INFO    | Flow run 'vivacious-parrot' - Finished in state Completed('All states completed.')

Process finished with exit code 0
sharing
prefect version
might be helpful if nothing else helps, I'd suggest: 1. Creating a new virtual environment on your agent machine 2. Running locally on that machine to validate it's working 3. Then recreate a deployment and while doing it, explicitly point at this new fresh virtual environment e.g.:
Copy code
DeploymentSpec(
    name="ex-flow-dev",
    flow=your_flow_name,
    flow_runner=SubprocessFlowRunner(condaenv="yourVenv"),
    schedule=IntervalSchedule(interval=timedelta(minutes=15)),
)
d
1. Yes, i’m using gcs storage and to make sure that it wasn’t a question about permissions i open the python repl on the agent and tried to fetch blobs from the storage bucket - and it succeeded. 2. yes, it works when running locally.
Copy code
root@agent:/app# prefect version
Version:             2.0b4
API version:         0.3.1
Python version:      3.10.4
Git commit:          344acb00
Built:               Thu, May 12, 2022 5:28 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted
k
This is specific to Google storage, we were serializing the Client as part of the result. The issue has been fixed and will be out next release
d
hm, i think i am using the default-taskrunner, Thanks anyhow. I will give @Anna Geller suggestions a try, and see if i can get around it with changing task runner..
Hi again. I’ve now had some time trying to get my head around the issue here and succeded in reproducing the error on my local machine which makes life easier to troubleshoot. I’ve now updated to version 6.0b5 so my version info is:
Copy code
❯ prefect version  
Version:             2.0b5
API version:         0.3.1
Python version:      3.10.4
Git commit:          7b27c7cf
Built:               Tue, May 17, 2022 4:54 PM
OS/Arch:             darwin/x86_64
Profile:             prod
Server type:         hosted
First of all, the issue @Kevin Kho is mentioning seems to be happening also for the concurrent taskrunner. In my example i’ve only used the default task runner - and i get the same error as mentioned in the issue. So basically, i have the same flow as defined above. If i set the prefect storage to point to a local storage - everything runs fine. If change my default storage to a google cloud storage - it fails with the error message printed before. To troubleshoot that i have the right permissions to the default bucket i added the following code to the test_flow.py “sc_prefect-3” is the bucket i’ve set to default storage.
Copy code
if __name__=="__main__":
    c = storage.Client()
    b = c.get_bucket("sc_prefect-3")
    print(b)
    blob_list = list(c.list_blobs("sc_prefect-3"))
    blob = blob_list[0]
    with open('file-to-download-to', 'wb') as file_obj:
        c.download_blob_to_file(blob, file_obj)
        print("downloaded completed")
    blob = b.blob('myfile')
    blob.upload_from_filename('file-to-download-to')
    print("upload completed")
    test_flow()
The list, read and write works well against the bucket - so it shouldn’t be a permission issue - but rather something that goes wrong when running the flow against the google cloud storage. Are there anything i am missing when it comes to permissions/setup etc? Would be great if i was able to use the google cloud storage - so let me know if i can do anything else to troubleshoot this further?
Are there any ways to check that Prefect has the permission it needs to google cloud storage?
I am using a service account, and set the credentials by export GOOGLE_APPLICATION_CREDENTIALS=“file.json”
k
I don’t think this is a permissions thing based on your test. Is your task returning anything? I think the error is not exact to the DaskTaskRunner issue, but very similar. Prefect is including the connection as part of the task result so it gets serialized. Again, that will be patched next release. I could dig more tomorrow since today is a holiday
d
Thanks for your replay @Kevin Kho ! The tasks are returning strings for the moment, but i also tried returning None without any improvements. I found a work-around: just switched to use the file system option and direct the path to the bucket. Since it works for now i can wait until next release and try it out then - to see if it’s fixed by the patch? Ah, it’s always holiday somewhere in the world! Enjoy! 😃 (let me know if you want me to dig into it in anyway.. Glad if i can help out in some way.. )
k
Yes of course, and the next release should be like either this week or next. Thanks!
👍 1