Stephen Lloyd
04/13/2022, 11:04 AMwith Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
creds = get_credentials()
conn = get_connection(creds)
tables = get_tables()
save_data = load_to_s3.map(tables, conn=unmapped(conn))
conn.set_upstream(creds)
save_data.set_upstream(tables)
save_data.set_upstream(conn)
It’s failing on get_connection
and the relevant code is:
@task
def get_credentials():
return PrefectSecret(PREFECT_FLOW_NAME).run()
@task
def get_connection(creds):
return connectors.get_redshift_connector(creds)
# from another file...
import redshift_connector
def get_redshift_connector(creds: dict) -> object:
conn = redshift_connector.connect(
host=creds['host'],
database=creds['database'],
user=creds['user'],
password=creds['password']
)
return conn
When I move to running in ECS, it fails with the following trace…
Unexpected error: TypeError("cannot pickle 'Struct' object")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 71, in write
binary_data = new.serializer.serialize(new.value)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'Struct' object
Anna Geller
cloudpickle
- that's why you get pickle
error
To solve it, move the logic to retrieve credentials into tasks that need it (here, it seems to be load_to_s3
task)PrefectSecret("xxx").run()
instead, use:
with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
your_secret = PrefectSecret("xxx")
tables = get_tables()
save_data = load_to_s3.map(tables, secret=unmapped(your_secret))
this logic assumes you moved the Redshift connector logic to the load_to_s3
task
Also: could you please move all the code blocks and the traceback to the thread to keep the main channel cleaner?Stephen Lloyd
04/13/2022, 11:18 AMAnna Geller
Stephen Lloyd
04/13/2022, 11:21 AMdoing this is not recommended:
The PrefectSecret is json with all the redshift connection details and I (embarrassingly) couldn’t figure out how to change the PS object that returns into to a dict that I could use. But .run() did it for me. How should I change that.Copy codePrefectSecret("xxx").run()
Anna Geller
Stephen Lloyd
04/13/2022, 11:25 AM@task
def load_to_s3(table: str, conn) -> None:
with conn.cursor() as cursor:
query = f'select * from {table}'
print(query)
cursor.execute(query)
result: pandas.DataFrame = cursor.fetch_dataframe()
awswrangler.s3.to_csv(
df=result,
path=f's3://<my_s3_bucket>/prefect/{PREFECT_FLOW_NAME}/{table}.csv'
)
with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
your_secret = PrefectSecret("xxx")
make_connection = get_connection(your_secret)
but I seem to be getting the same struct/pickle error.Anna Geller
get_connection
to your load_to_s3
taskwith Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
your_secret = PrefectSecret("xxx")
tables = get_tables()
save_data = load_to_s3.map(tables, secret=unmapped(your_secret))
this logic assumes you moved the Redshift connector logic to the load_to_s3
taskmake_connection
between tasks as data dependency.Stephen Lloyd
04/14/2022, 2:33 AM