https://prefect.io logo
a

Andy Dyer

09/29/2020, 9:40 PM
hey all. I am trying to register a flow but its having difficulty pickling a dependency of
paramiko
which I am using for sftp. See below stack trace. it makes sense and other people are having difficulties pickling the same object in different applications. Is the best bet to use ShellTasks to sftp?
<https://github.com/paramiko/paramiko/issues/753>
<https://stackoverflow.com/questions/39321606/cant-pickle-an-rsa-key-to-send-over-a-socket>
stack trace : https://pastebin.com/8hyAdpx1
👍 1
j

Jim Crist-Harif

09/29/2020, 9:42 PM
Without the code it's hard to tell you more, but I suspect that you're creating a paramiko object outside of a task (in your top-level namespace) then using it inside a task, which is causing the issue.
You should be able to use paramiko with prefect without issue, but some care needs to be taken to ensure unpickleable objects are only created at flow run time, rather than flow registration time.
a

Andy Dyer

09/29/2020, 9:44 PM
that may be it i have in a different module
Copy code
def open_sftp():
    transport = paramiko.Transport((SFTP_CREDENTIALS["server"], 22))
    transport.connect(None, SFTP_CREDENTIALS["username"], SFTP_CREDENTIALS["password"])
    sftp = paramiko.SFTPClient.from_transport(transport)
    return sftp
Copy code
@task
def get_zip_from_sftp() -> str:

    with (open_sftp()) as sftp_client:
        sftp_client.chdir("active")
        latest_zip = get_latest_sftp_file(sftp_client)
        sftp_client.get(latest_zip, local_path)
    return local_path
j

Jim Crist-Harif

09/29/2020, 9:46 PM
Hmmm, that shouldn't cause an issue, since you're not calling paramiko stuff until runtime.
Do you call paramiko things anywhere else?
a

Andy Dyer

09/29/2020, 9:48 PM
nope. Just there
j

Jim Crist-Harif

09/29/2020, 9:50 PM
That's very odd. Does the following run clean for you?
Copy code
import paramiko, cloudpickle

cloudpickle.dumps(paramiko)
cloudpickle.dumps(paramiko.Transport)
cloudpickle.dumps(paramiko.SFTPClient.from_transport)
a

Andy Dyer

09/29/2020, 9:52 PM
smart idea.. let me see
yeah no problems.
j

Jim Crist-Harif

09/29/2020, 9:53 PM
Are you using any other libraries that might be backed by cffi? Nothing in your traceback points to paramiko specifically, it's failing to pickle a
FFI
backend from cffi, which normally shouldn't be serialized between things.
I'd need a reproducible example to help debug further, if that's possible.
a

Andy Dyer

09/29/2020, 9:56 PM
so the flow involves pushing a csv to BQ and a backup csv to Google Cloud storage. When I comment out those tasks in the flow it registers fine and dandy. The only common thing between the two is
from google.oauth2.service_account import Credentials
j

Jim Crist-Harif

09/29/2020, 9:57 PM
Do you create a global
Credentials
object outside of a task?
👍 1
a

Andy Dyer

09/29/2020, 9:57 PM
thats it
which is probably is poor functional programming
ill pass it in to each task as a parameter
j

Jim Crist-Harif

09/29/2020, 9:59 PM
If you ever use dask as an executor, you'll run into the same serialization issue if you're passing around credentials as results. I recommend recreating the credentials inside each task that needs them (you might abstract this out to a helper function if needed).
a

Andy Dyer

09/29/2020, 10:37 PM
Thanks Jim! really appreciate your help