Daniel
07/28/2020, 11:46 PM# ETL Messages
extract_messages = etl_msg.extract_messages(company_ids)
process_messages = etl_msg.process_messages(extract_messages)
truncate_messages = etl_msg.truncate_messages(upstream_tasks=[process_messages])
load_messages = etl_msg.load_messages(
process_messages, upstream_tasks=[truncate_messages]
)
# ETL Message Entity Mapping
extract_message_entity_mapping = etl_msg.extract_message_entity_mapping(
extract_messages
)
process_message_entity_mapping = etl_msg.process_message_entity_mapping(
extract_message_entity_mapping
)
truncate_message_entity_mapping = etl_msg.truncate_message_entity_mapping(
upstream_tasks=[process_message_entity_mapping]
)
load_message_entity_mapping = etl_msg.load_messages(
process_message_entity_mapping,
upstream_tasks=[truncate_message_entity_mapping],
)
@task()
def extract_messages(company_ids: List[int]) -> List[Dict]:
database_session = etl_utils.get_source_db_sessionmaker()
with db.session_manager(database_session) as session:
data = (
session.query(
qw_oltp.Message.message_id,
qw_oltp.Message.messageThreadId,
qw_oltp.Message.parentMessageId,
qw_oltp.MessageType.messageType,
qw_oltp.MessageCategory.name.label("message_category"),
# A bunch more columns
.group_by(qw_oltp.Message.message_id)
.all()
)
return [x._asdict() for x in data]
@task()
def extract_message_entity_mapping(messages: List[Dict]) -> List[Dict]:
message_ids = [x.get("message_id") for x in messages]
database_session = etl_utils.get_source_db_sessionmaker()
with db.session_manager(database_session) as session:
data = (
session.query(
qw_oltp.MessageEntity.message_entity_id,
qw_oltp.MessageEntity.messageId,
qw_oltp.MessageEntity.entityCategory,
qw_oltp.MessageEntityMapping.userId,
qw_oltp.MessageEntityMapping.jobId,
qw_oltp.MessageEntityMapping.profileId,
)
.filter(qw_oltp.MessageEntity.messageId.in_(message_ids))
.all()
)
return [x._asdict() for x in data]
@contextmanager
def session_manager(db_sessionmaker: sessionmaker) -> session:
"""Provide a transactional scope around a series of operations."""
db_session = db_sessionmaker()
try:
yield db_session
except:
db_session.rollback()
raise
finally:
db_session.close()
from dotenv import load_dotenv
from prefect import Flow, Parameter
from prefect.engine.executors import DaskExecutor
from prefect.engine.state import Failed
from prefect.utilities.notifications import slack_notifier
from quantum_transfer.etl import messages as etl_msg
slack_handler = slack_notifier(only_states=[Failed])
def main():
with Flow("RPO Data Feed", state_handlers=[slack_handler]) as flow:
company_ids = Parameter(name="company_ids", required=True)
# ETL Messages
extract_messages = etl_msg.extract_messages(company_ids)
process_messages = etl_msg.process_messages(extract_messages)
truncate_messages = etl_msg.truncate_messages(upstream_tasks=[process_messages])
load_messages = etl_msg.load_messages(
process_messages, upstream_tasks=[truncate_messages]
)
# ETL Message Entity Mapping
extract_message_entity_mapping = etl_msg.extract_message_entity_mapping(
extract_messages
)
process_message_entity_mapping = etl_msg.process_message_entity_mapping(
extract_message_entity_mapping
)
truncate_message_entity_mapping = etl_msg.truncate_message_entity_mapping(
upstream_tasks=[process_message_entity_mapping]
)
load_message_entity_mapping = etl_msg.load_messages(
process_message_entity_mapping,
upstream_tasks=[truncate_message_entity_mapping],
)
flow_run = flow.run(executor=DaskExecutor(), company_ids=[10])
flow.visualize(flow_state=flow_run)
if __name__ == "__main__":
load_dotenv()
main()
Chris White
07/29/2020, 12:08 AMDaniel
07/29/2020, 12:25 AMload_message_entity_mapping = etl_msg.load_messages(
process_message_entity_mapping,
upstream_tasks=[truncate_message_entity_mapping],
)
load_message_entity_mapping = etl_msg.load_message_entity_mapping(
process_message_entity_mapping,
upstream_tasks=[truncate_message_entity_mapping],
)
Chris White
07/29/2020, 12:29 AMResult
, so that’s another side effect to be aware ofDaniel
07/29/2020, 1:52 AM