I have a flow that when I run locally, works fine....
# prefect-community
s
I have a flow that when I run locally, works fine. Here’s my flow
Copy code
with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
    creds = get_credentials()
    conn = get_connection(creds)
    tables = get_tables()
    save_data = load_to_s3.map(tables, conn=unmapped(conn))
    conn.set_upstream(creds)
    save_data.set_upstream(tables)
    save_data.set_upstream(conn)
It’s failing on
get_connection
and the relevant code is:
Copy code
@task
def get_credentials():
    return PrefectSecret(PREFECT_FLOW_NAME).run()

@task
def get_connection(creds):
    return connectors.get_redshift_connector(creds)

# from another file...
import redshift_connector


def get_redshift_connector(creds: dict) -> object:
    conn = redshift_connector.connect(
        host=creds['host'],
        database=creds['database'],
        user=creds['user'],
        password=creds['password']
    )
    return conn
When I move to running in ECS, it fails with the following trace…
Copy code
Unexpected error: TypeError("cannot pickle 'Struct' object")
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 71, in write
    binary_data = new.serializer.serialize(new.value)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
    return cloudpickle.dumps(value)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'Struct' object
Any ideas? I’m wondering if I’m retrieving the secrets successfully locally, but not on an ECS run.
a
you're trying to pass credentials and connections between mapped tasks - this is not thread-safe and not serializable with
cloudpickle
- that's why you get
pickle
error To solve it, move the logic to retrieve credentials into tasks that need it (here, it seems to be
load_to_s3
task)
doing this is not recommended:
Copy code
PrefectSecret("xxx").run()
instead, use:
Copy code
with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
    your_secret = PrefectSecret("xxx")
    tables = get_tables()
    save_data = load_to_s3.map(tables, secret=unmapped(your_secret))
this logic assumes you moved the Redshift connector logic to the
load_to_s3
task Also: could you please move all the code blocks and the traceback to the thread to keep the main channel cleaner?
s
ok, thanks. Alternatively, if I moved the credentials task into the connection task, I should be able to pass the connection safely, correct? I was trying to avoid opening up n connections (one per table) to a vendor’s redshift cluster. :)
a
Correct. I'm not sure whether you need a Redshift connection to load data to S3 though? You could easily use the load_to_s3 task only to load the data to S3, and then in a separate downstream task connect to Redshift and do the COPY command to efficiently load the data at once from a given S3 path you can try out awswrangler for that
s
doing this is not recommended:
Copy code
PrefectSecret("xxx").run()
The PrefectSecret is json with all the redshift connection details and I (embarrassingly) couldn’t figure out how to change the PS object that returns into to a dict that I could use. But .run() did it for me. How should I change that.
a
you don't need to - when you pass it as data dependency to your downstream tasks, this will work fine
👍 1
s
Re: redshift, it’s not my cluster and not my AWS account, thus I cannot prescribe that I be given a role. awswrangler does not seem to facilitate connections to redshift via uname and password, so I had to resort to using redshift_connector
Copy code
@task
def load_to_s3(table: str, conn) -> None:
    with conn.cursor() as cursor:
        query = f'select * from {table}'
        print(query)
        cursor.execute(query)
        result: pandas.DataFrame = cursor.fetch_dataframe()

    awswrangler.s3.to_csv(
        df=result,
        path=f's3://<my_s3_bucket>/prefect/{PREFECT_FLOW_NAME}/{table}.csv'
    )
👍 1
Hi, sorry. You suggested this would work:
Copy code
with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
    your_secret = PrefectSecret("xxx")
    make_connection = get_connection(your_secret)
but I seem to be getting the same struct/pickle error.
a
no, I did not - I recommended moving the
get_connection
to your
load_to_s3
task
quote: instead, use:
Copy code
with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
    your_secret = PrefectSecret("xxx")
    tables = get_tables()
    save_data = load_to_s3.map(tables, secret=unmapped(your_secret))
this logic assumes you moved the Redshift connector logic to the
load_to_s3
task
I want that you understand the problem here: HTTP clients and DB connection objects are not serializable with cloudpickle. That's why you cannot pass
make_connection
between tasks as data dependency.
s
Thank you, I had not made that connection. 🙂
👍 1