Sash Stasyk
11/18/2021, 3:34 PMAnna Geller
from prefect import resource_manager
@resource_manager
class SnowflakeConnection:
def __init__(self, database: str = "DEV"):
self.database = database
def setup(self):
db_conn_string = get_snowflake_connection_string(self.database)
db_engine = create_engine(db_conn_string)
return db_engine.connect()
@staticmethod
def cleanup(conn):
conn.close()
Anna Geller
with Flow("db-flow") as flow:
with SnowflakeConnection() as conn:
customers = get_customer_data(conn)
orders = get_order_data(conn)
final_data = merge_data(orders, customers) # this no longer needs db connection
Anna Geller
Sash Stasyk
11/18/2021, 3:38 PMSylvain Hazard
11/18/2021, 3:38 PMAnna Geller
Sylvain Hazard
11/18/2021, 3:41 PMAnna Geller
Sylvain Hazard
11/18/2021, 4:09 PMSash Stasyk
11/19/2021, 5:43 PMimport prefect
import cx_Oracle
from prefect import task
from prefect import resource_manager
@resource_manager
class ConnectionFromSecret:
def __init__(self, secret):
self.host = secret['host']
self.port = secret['port']
self.dbname = secret['dbname']
self.username = secret['username']
self.password = secret['password']
def setup(self):
dsn = cx_Oracle.makedsn(self.host, self.port, self.dbname)
return cx_Oracle.connect(user=self.username, password=self.password, dsn=dsn)
@staticmethod
def cleanup(conn):
conn.close()
and
with ConnectionFromSecret(secret) as conn:
...
which works fine locally with flow.run
but fails on the server with
Unexpected error: TypeError('Object of type ConnectionFromSecret is not JSON serializable')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 926, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/prefect_result.py", line 62, in write
new.location = self.serializer.serialize(new.value).decode("utf-8")
File "/usr/local/lib/python3.7/site-packages/prefect/engine/serializers.py", line 110, in serialize
return json.dumps(value).encode()
File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type ConnectionFromSecret is not JSON serializable
Anna Geller
Anna Geller
Sash Stasyk
11/19/2021, 5:49 PMSash Stasyk
11/19/2021, 5:49 PMSash Stasyk
11/19/2021, 5:52 PMwith Flow("name", result=PrefectResult()) as flow:
# observability:
secret = AWSSecretsManager(secret=f"...",boto_kwargs=boto_kwargs)
with ConnectionFromSecret(secret) as conn:
# ...tasks
# ...
if __name__ == "__main__":
flow_state = flow.run()
flow.visualize(flow_state, filename="current", format="png")
else:
################################################################################
# Set up flow run configuration
import sys
import pkg_resources
if "pytest" not in sys.modules:
# Apply the schedule to the flow only if run on the server, if run locally this code will not execute
# Run at 11:30 pm every day
flow.schedule = Schedule(clocks=[CronClock("30 23 * * *")])
version = pkg_resources.get_distribution("...").version
flow.run_config = KubernetesRun(
env={
"EXTRA_PIP_PACKAGES": f"...=={version}",
"AWS_ACCOUNT_ID": os.environ.get("AWS_ACCOUNT_ID"),
}
)
#flow.executor = LocalDaskExecutor()
flow.executor = LocalExecutor()
Anna Geller
Sash Stasyk
11/19/2021, 6:03 PMAnna Geller
Anna Geller
@task(result=PrefectResult())