Albert Wong
03/03/2023, 10:23 PMSean Williams
03/03/2023, 10:33 PMNate
03/03/2023, 10:44 PMSqlAlchemyConnector.execute(query_string)
with your query_string
?Albert Wong
03/03/2023, 10:44 PMAlbert Wong
03/03/2023, 10:44 PMwith cte
as (
select
l.load_time at time zone 'america/los_angeles' load_time_pst,
l.tableau_servername,
row_number() over(partition by (event.obj ->> 'traceUuid') order by l.load_time desc) rnk,
(event.obj) event_obj
from log.activity_log l
left outer join lateral jsonb_array_elements(l.event_data) as event(obj)
on true
)
insert into etl.activity_f
select
-- loading info
load_time_pst,
tableau_servername,
-- event
(event_obj ->> 'traceUuid') trace_uuid,
cast((event_obj -> 'event' ->> 'eventTime') as timestamp with time zone) at time zone 'america/los_angeles' event_time_pst,
cast((event_obj -> 'event' ->> 'isError') as boolean) is_error,
(event_obj -> 'event' -> 'metadata' ->> 'eventCategory') event_category,
(event_obj -> 'event' -> 'metadata' ->> 'eventType') event_type,
-- permissions
(event_obj -> 'event' ->> 'granteeType') grantee_type,
cast((event_obj -> 'event' ->> 'granteeId') as integer) grantee_id,
(event_obj -> 'event' ->> 'granteeValue') grantee_value,
(event_obj -> 'event' ->> 'capabilityValue') capability_value,
-- user
cast((event_obj -> 'event' ->> 'actorUserId') as integer) actor_user_id,
cast((event_obj -> 'event' ->> 'initiatingUserId') as integer) initiating_user_id,
-- content
(event_obj -> 'event' ->> 'authorizableType') content_type,
cast((event_obj -> 'event' ->> 'contentId') as integer) content_id,
-- site
(event_obj -> 'event' ->> 'siteLuid') site_luid
from cte
where
rnk=1
and (event_obj ->> 'traceUuid') not in (select trace_uuid from etl.activity_f)
and date(cast((event_obj -> 'event' ->> 'eventTime') as timestamp with time zone) at time zone 'america/los_angeles') between '{start_date}' and '{end_date}' -- between '2023-03-01' and '2023-03-02'
Albert Wong
03/03/2023, 10:45 PMAlbert Wong
03/03/2023, 10:45 PMwith open('sql/etl.activity_f.sql') as f:
sql_query = f.read().format(**{'start_date': start_date, 'end_date': end_date})
# doesn't seem to support insert into select statements
#with SqlAlchemyConnector.load(destination_block) as connector:
# connector.execute(sql_query)
Albert Wong
03/03/2023, 10:45 PMAlbert Wong
03/03/2023, 10:47 PMAlbert Wong
03/03/2023, 11:06 PMcursor = connection.cursor()
cursor.execute(sql_query)
connection.commit()
connection.close()
appreciate you hearing me outNate
03/04/2023, 1:15 AMDatabaseCredentials
?
something like
from prefect_sqlalchemy import DatabaseCredentials, SyncDriver
db_creds = DatabaseCredentials(
driver=SyncDriver.POSTGRESQL_PSYCOPG2,
...
)
Albert Wong
03/04/2023, 1:46 AMNate
03/04/2023, 1:46 AM