s

    Stephen Lloyd

    5 months ago
    I have a flow that when I run locally, works fine. Here’s my flow
    with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
        creds = get_credentials()
        conn = get_connection(creds)
        tables = get_tables()
        save_data = load_to_s3.map(tables, conn=unmapped(conn))
        conn.set_upstream(creds)
        save_data.set_upstream(tables)
        save_data.set_upstream(conn)
    It’s failing on
    get_connection
    and the relevant code is:
    @task
    def get_credentials():
        return PrefectSecret(PREFECT_FLOW_NAME).run()
    
    @task
    def get_connection(creds):
        return connectors.get_redshift_connector(creds)
    
    # from another file...
    import redshift_connector
    
    
    def get_redshift_connector(creds: dict) -> object:
        conn = redshift_connector.connect(
            host=creds['host'],
            database=creds['database'],
            user=creds['user'],
            password=creds['password']
        )
        return conn
    When I move to running in ECS, it fails with the following trace…
    Unexpected error: TypeError("cannot pickle 'Struct' object")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 71, in write
        binary_data = new.serializer.serialize(new.value)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
        return cloudpickle.dumps(value)
      File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle 'Struct' object
    Any ideas? I’m wondering if I’m retrieving the secrets successfully locally, but not on an ECS run.
    Anna Geller

    Anna Geller

    5 months ago
    you're trying to pass credentials and connections between mapped tasks - this is not thread-safe and not serializable with
    cloudpickle
    - that's why you get
    pickle
    error To solve it, move the logic to retrieve credentials into tasks that need it (here, it seems to be
    load_to_s3
    task)
    doing this is not recommended:
    PrefectSecret("xxx").run()
    instead, use:
    with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
        your_secret = PrefectSecret("xxx")
        tables = get_tables()
        save_data = load_to_s3.map(tables, secret=unmapped(your_secret))
    this logic assumes you moved the Redshift connector logic to the
    load_to_s3
    task Also: could you please move all the code blocks and the traceback to the thread to keep the main channel cleaner?
    s

    Stephen Lloyd

    5 months ago
    ok, thanks. Alternatively, if I moved the credentials task into the connection task, I should be able to pass the connection safely, correct? I was trying to avoid opening up n connections (one per table) to a vendor’s redshift cluster. 😃
    Anna Geller

    Anna Geller

    5 months ago
    Correct. I'm not sure whether you need a Redshift connection to load data to S3 though? You could easily use the load_to_s3 task only to load the data to S3, and then in a separate downstream task connect to Redshift and do the COPY command to efficiently load the data at once from a given S3 path you can try out awswrangler for that
    s

    Stephen Lloyd

    5 months ago
    doing this is not recommended:
    PrefectSecret("xxx").run()
    The PrefectSecret is json with all the redshift connection details and I (embarrassingly) couldn’t figure out how to change the PS object that returns into to a dict that I could use. But .run() did it for me. How should I change that.
    Anna Geller

    Anna Geller

    5 months ago
    you don't need to - when you pass it as data dependency to your downstream tasks, this will work fine
    s

    Stephen Lloyd

    5 months ago
    Re: redshift, it’s not my cluster and not my AWS account, thus I cannot prescribe that I be given a role. awswrangler does not seem to facilitate connections to redshift via uname and password, so I had to resort to using redshift_connector
    @task
    def load_to_s3(table: str, conn) -> None:
        with conn.cursor() as cursor:
            query = f'select * from {table}'
            print(query)
            cursor.execute(query)
            result: pandas.DataFrame = cursor.fetch_dataframe()
    
        awswrangler.s3.to_csv(
            df=result,
            path=f's3://<my_s3_bucket>/prefect/{PREFECT_FLOW_NAME}/{table}.csv'
        )
    Hi, sorry. You suggested this would work:
    with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
        your_secret = PrefectSecret("xxx")
        make_connection = get_connection(your_secret)
    but I seem to be getting the same struct/pickle error.
    Anna Geller

    Anna Geller

    5 months ago
    no, I did not - I recommended moving the
    get_connection
    to your
    load_to_s3
    task
    quote: instead, use:
    with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
        your_secret = PrefectSecret("xxx")
        tables = get_tables()
        save_data = load_to_s3.map(tables, secret=unmapped(your_secret))
    this logic assumes you moved the Redshift connector logic to the
    load_to_s3
    task
    I want that you understand the problem here: HTTP clients and DB connection objects are not serializable with cloudpickle. That's why you cannot pass
    make_connection
    between tasks as data dependency.
    s

    Stephen Lloyd

    5 months ago
    Thank you, I had not made that connection. 🙂