hey all. I am trying to register a flow but its ha...
# ask-community
a
hey all. I am trying to register a flow but its having difficulty pickling a dependency of
paramiko
which I am using for sftp. See below stack trace. it makes sense and other people are having difficulties pickling the same object in different applications. Is the best bet to use ShellTasks to sftp?
<https://github.com/paramiko/paramiko/issues/753>
<https://stackoverflow.com/questions/39321606/cant-pickle-an-rsa-key-to-send-over-a-socket>
stack trace : https://pastebin.com/8hyAdpx1
👍 1
j
Without the code it's hard to tell you more, but I suspect that you're creating a paramiko object outside of a task (in your top-level namespace) then using it inside a task, which is causing the issue.
You should be able to use paramiko with prefect without issue, but some care needs to be taken to ensure unpickleable objects are only created at flow run time, rather than flow registration time.
a
that may be it i have in a different module
Copy code
def open_sftp():
    transport = paramiko.Transport((SFTP_CREDENTIALS["server"], 22))
    transport.connect(None, SFTP_CREDENTIALS["username"], SFTP_CREDENTIALS["password"])
    sftp = paramiko.SFTPClient.from_transport(transport)
    return sftp
Copy code
@task
def get_zip_from_sftp() -> str:

    with (open_sftp()) as sftp_client:
        sftp_client.chdir("active")
        latest_zip = get_latest_sftp_file(sftp_client)
        sftp_client.get(latest_zip, local_path)
    return local_path
j
Hmmm, that shouldn't cause an issue, since you're not calling paramiko stuff until runtime.
Do you call paramiko things anywhere else?
a
nope. Just there
j
That's very odd. Does the following run clean for you?
Copy code
import paramiko, cloudpickle

cloudpickle.dumps(paramiko)
cloudpickle.dumps(paramiko.Transport)
cloudpickle.dumps(paramiko.SFTPClient.from_transport)
a
smart idea.. let me see
yeah no problems.
j
Are you using any other libraries that might be backed by cffi? Nothing in your traceback points to paramiko specifically, it's failing to pickle a
FFI
backend from cffi, which normally shouldn't be serialized between things.
I'd need a reproducible example to help debug further, if that's possible.
a
so the flow involves pushing a csv to BQ and a backup csv to Google Cloud storage. When I comment out those tasks in the flow it registers fine and dandy. The only common thing between the two is
from google.oauth2.service_account import Credentials
j
Do you create a global
Credentials
object outside of a task?
👍 1
a
thats it
which is probably is poor functional programming
ill pass it in to each task as a parameter
j
If you ever use dask as an executor, you'll run into the same serialization issue if you're passing around credentials as results. I recommend recreating the credentials inside each task that needs them (you might abstract this out to a helper function if needed).
a
Thanks Jim! really appreciate your help