I have a kinda general ETL question, I think I'm d...
# prefect-community
d
I have a kinda general ETL question, I think I'm doing something wrong with my Flow. If I comment out the second part, everything runs in ~3 minutes. Once I had the second part back in, everything takes 20+ minutes and the ORM starts rolling back transactions and then the Flow kills the worker at some point. Is it related to how I'm passing the results from extract_messages to the "extract_message_entity_mapping" function maybe?
Copy code
# ETL Messages
        extract_messages = etl_msg.extract_messages(company_ids)
        process_messages = etl_msg.process_messages(extract_messages)
        truncate_messages = etl_msg.truncate_messages(upstream_tasks=[process_messages])
        load_messages = etl_msg.load_messages(
            process_messages, upstream_tasks=[truncate_messages]
        )

        # ETL Message Entity Mapping
        extract_message_entity_mapping = etl_msg.extract_message_entity_mapping(
            extract_messages
        )
        process_message_entity_mapping = etl_msg.process_message_entity_mapping(
            extract_message_entity_mapping
        )
        truncate_message_entity_mapping = etl_msg.truncate_message_entity_mapping(
            upstream_tasks=[process_message_entity_mapping]
        )
        load_message_entity_mapping = etl_msg.load_messages(
            process_message_entity_mapping,
            upstream_tasks=[truncate_message_entity_mapping],
        )
Here are the (abbreviated) functions
Copy code
@task()
def extract_messages(company_ids: List[int]) -> List[Dict]:
    database_session = etl_utils.get_source_db_sessionmaker()
    with db.session_manager(database_session) as session:
        data = (
            session.query(
                qw_oltp.Message.message_id,
                qw_oltp.Message.messageThreadId,
                qw_oltp.Message.parentMessageId,
                qw_oltp.MessageType.messageType,
                qw_oltp.MessageCategory.name.label("message_category"),
                # A bunch more columns
            .group_by(qw_oltp.Message.message_id)
            .all()
        )
    return [x._asdict() for x in data]
Copy code
@task()
def extract_message_entity_mapping(messages: List[Dict]) -> List[Dict]:
    message_ids = [x.get("message_id") for x in messages]
    database_session = etl_utils.get_source_db_sessionmaker()
    with db.session_manager(database_session) as session:

        data = (
            session.query(
                qw_oltp.MessageEntity.message_entity_id,
                qw_oltp.MessageEntity.messageId,
                qw_oltp.MessageEntity.entityCategory,
                qw_oltp.MessageEntityMapping.userId,
                qw_oltp.MessageEntityMapping.jobId,
                qw_oltp.MessageEntityMapping.profileId,
            )
            .filter(qw_oltp.MessageEntity.messageId.in_(message_ids))
            .all()
        )
    return [x._asdict() for x in data]
And then here's the db.session_manager function:
Copy code
@contextmanager
def session_manager(db_sessionmaker: sessionmaker) -> session:
    """Provide a transactional scope around a series of operations."""
    db_session = db_sessionmaker()
    try:
        yield db_session
    except:
        db_session.rollback()
        raise
    finally:
        db_session.close()
This is the exact error I got in the shell where I was running the script
Here is the full script for the Flow:
Copy code
from dotenv import load_dotenv
from prefect import Flow, Parameter
from prefect.engine.executors import DaskExecutor
from prefect.engine.state import Failed
from prefect.utilities.notifications import slack_notifier

from quantum_transfer.etl import messages as etl_msg

slack_handler = slack_notifier(only_states=[Failed])

def main():
    with Flow("RPO Data Feed", state_handlers=[slack_handler]) as flow:

        company_ids = Parameter(name="company_ids", required=True)

        # ETL Messages
        extract_messages = etl_msg.extract_messages(company_ids)
        process_messages = etl_msg.process_messages(extract_messages)
        truncate_messages = etl_msg.truncate_messages(upstream_tasks=[process_messages])
        load_messages = etl_msg.load_messages(
            process_messages, upstream_tasks=[truncate_messages]
        )

        # ETL Message Entity Mapping
        extract_message_entity_mapping = etl_msg.extract_message_entity_mapping(
            extract_messages
        )
        process_message_entity_mapping = etl_msg.process_message_entity_mapping(
            extract_message_entity_mapping
        )
        truncate_message_entity_mapping = etl_msg.truncate_message_entity_mapping(
            upstream_tasks=[process_message_entity_mapping]
        )
        load_message_entity_mapping = etl_msg.load_messages(
            process_message_entity_mapping,
            upstream_tasks=[truncate_message_entity_mapping],
        )

    flow_run = flow.run(executor=DaskExecutor(), company_ids=[10])
    flow.visualize(flow_state=flow_run)

if __name__ == "__main__":
    load_dotenv()
    main()
Oh and I'm trying to run it again now without Dask
But I was just using the local dask executor, no cluster or anything
c
Hi Daniel - this issue describes your situation and what’s happening: https://github.com/PrefectHQ/prefect/issues/2772#issuecomment-643826665
d
I just realized at least one of my issues is that I was calling the wrong function in the very last step
Instead of:
Copy code
load_message_entity_mapping = etl_msg.load_messages(
            process_message_entity_mapping,
            upstream_tasks=[truncate_message_entity_mapping],
        )
I needed:
Copy code
load_message_entity_mapping = etl_msg.load_message_entity_mapping(
            process_message_entity_mapping,
            upstream_tasks=[truncate_message_entity_mapping],
        )
🤦‍♂️
But while I'm here, is this a good design pattern or should I reconsider it?
I wasn't sure if it was okay to be passing such large objects between tasks
Maybe it's better to just pass the message ids first, but then I have some redundant querying on the database side
Any thoughts / advice?
c
It’s perfectly fine to pass large objects as long as you’re aware of the memory consequences; note that if you either turn checkpointing on or run against a prefect backend, then those large objects will additionally be persisted using your configured
Result
, so that’s another side effect to be aware of
d
I dug a little deeper into this, and apparently Amazon Redshift is notorious for slow inserts. I’ll have to change my strategy to load files to S3 and then copy them into redshift from there apparently