I have a flow that when I run locally, works fine.
Here’s my flow
with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
creds = get_credentials()
conn = get_connection(creds)
tables = get_tables()
save_data = load_to_s3.map(tables, conn=unmapped(conn))
conn.set_upstream(creds)
save_data.set_upstream(tables)
save_data.set_upstream(conn)
It’s failing on
get_connection
and the relevant code is:
@task
def get_credentials():
return PrefectSecret(PREFECT_FLOW_NAME).run()
@task
def get_connection(creds):
return connectors.get_redshift_connector(creds)
# from another file...
import redshift_connector
def get_redshift_connector(creds: dict) -> object:
conn = redshift_connector.connect(
host=creds['host'],
database=creds['database'],
user=creds['user'],
password=creds['password']
)
return conn
When I move to running in ECS, it fails with the following trace…
Unexpected error: TypeError("cannot pickle 'Struct' object")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 71, in write
binary_data = new.serializer.serialize(new.value)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'Struct' object