Hey, not sure if this the right place to ask, can ...
# ask-community
s
Hey, not sure if this the right place to ask, can anyone provide an idiomatic example of using Oracle DB connection across multiple tasks please? I am helping out a colleague with an ETL flow which requires multiple queries done across multiple connections, which would preferably persist throughout the flow. The initial problem we are encountering is that you cannot pass the connection object between tasks as it needs to be serialisable.
a
I have an example for Snowflake, but you can use the same for Oracle:
Copy code
from prefect import resource_manager

@resource_manager
class SnowflakeConnection:
    def __init__(self, database: str = "DEV"):
        self.database = database

    def setup(self):
        db_conn_string = get_snowflake_connection_string(self.database)
        db_engine = create_engine(db_conn_string)
        return db_engine.connect()

    @staticmethod
    def cleanup(conn):
        conn.close()
upvote 1
🙏 2
and then this is how you could use it in a flow:
Copy code
with Flow("db-flow") as flow:
    with SnowflakeConnection() as conn:
        customers = get_customer_data(conn)
        orders = get_order_data(conn)
    final_data = merge_data(orders, customers) # this no longer needs db connection
🙏 2
and here are docs providing more details about resource manager: https://docs.prefect.io/core/idioms/resource-manager.html
s
woo, thanks so much for a speedy reply! ❤️
s
Did not know about this feature so I'm taking the liberty of asking questions about it : how does it work with parallel tasks ? For example if I'm running mapped tasks on a Dask cluster. Is there a way to deal with potential concurrency ?
a
good question - afaik a database connection cannot be cloud-pickled and shared between Dask workers. Let me ask the team to be sure
s
Thanks !
a
@Sylvain Hazard it looks like you’re right - this wouldn’t work with Dask, you would have to define the connection within the task.
s
Noted, thanks for looking out !
👍 1
s
@Anna Geller we have tried using the above example as
Copy code
import prefect
import cx_Oracle
from prefect import task
from prefect import resource_manager


@resource_manager
class ConnectionFromSecret:
    def __init__(self, secret):
        self.host = secret['host']
        self.port = secret['port']
        self.dbname = secret['dbname']
        self.username = secret['username']
        self.password = secret['password']

    def setup(self):
        dsn = cx_Oracle.makedsn(self.host, self.port, self.dbname)
        return cx_Oracle.connect(user=self.username, password=self.password, dsn=dsn)

    @staticmethod
    def cleanup(conn):
        conn.close()
and
Copy code
with ConnectionFromSecret(secret) as conn:
  ...
which works fine locally with
flow.run
but fails on the server with
Copy code
Unexpected error: TypeError('Object of type ConnectionFromSecret is not JSON serializable')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 926, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/prefect_result.py", line 62, in write
    new.location = self.serializer.serialize(new.value).decode("utf-8")
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/serializers.py", line 110, in serialize
    return json.dumps(value).encode()
  File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type ConnectionFromSecret is not JSON serializable
a
can you show your Flow object definition?
@Sash Stasyk what storage do you use?
s
might need a few mins to tidy the flow to paste it here
we're using the BitBucket storage
Copy code
with Flow("name", result=PrefectResult()) as flow:
    
    # observability:
    secret = AWSSecretsManager(secret=f"...",boto_kwargs=boto_kwargs)

    with ConnectionFromSecret(secret) as conn:
        
        # ...tasks
        

# ...


if __name__ == "__main__":
    flow_state = flow.run()
    flow.visualize(flow_state, filename="current", format="png")

else:
    ################################################################################
    # Set up flow run configuration
    import sys
    import pkg_resources

    if "pytest" not in sys.modules:
        # Apply the schedule to the flow only if run on the server, if run locally this code will not execute
        # Run at 11:30 pm every day
        flow.schedule = Schedule(clocks=[CronClock("30 23 * * *")])

    version = pkg_resources.get_distribution("...").version
    flow.run_config = KubernetesRun(
        env={
            "EXTRA_PIP_PACKAGES": f"...=={version}",
            "AWS_ACCOUNT_ID": os.environ.get("AWS_ACCOUNT_ID"),
        }
    )

    #flow.executor = LocalDaskExecutor()
    flow.executor = LocalExecutor()
a
I see - you use result=PrefectResult() - this is the problem here
👀 1
s
which result should be used?
a
can you try using this result class only for the tasks that absolutely require storing their results in a Prefect backend? since you assigned that on a flow level, Prefect tries to serialize results of all tasks using PrefectResult. However, this result class can only be used for JSON serializable return values from tasks and the connection from the resource manager is not JSON serializable
so for the connection object no result class can be used really. So you would have to specify the results on a task level e.g.
Copy code
@task(result=PrefectResult())