Albert Wong
03/03/2023, 10:23 PMSean Williams
03/03/2023, 10:33 PMNate
03/03/2023, 10:44 PMSqlAlchemyConnector.execute(query_string) with your query_string ?Albert Wong
03/03/2023, 10:44 PMAlbert Wong
03/03/2023, 10:44 PMwith cte
as (
select
  l.load_time at time zone 'america/los_angeles' load_time_pst,
  l.tableau_servername,
  row_number() over(partition by (event.obj ->> 'traceUuid') order by l.load_time desc) rnk,
  (event.obj) event_obj
from log.activity_log l
left outer join lateral jsonb_array_elements(l.event_data) as event(obj)
  on true
)
insert into etl.activity_f
select
  -- loading info
  load_time_pst,
  tableau_servername,
  -- event
  (event_obj ->> 'traceUuid') trace_uuid,
  cast((event_obj -> 'event' ->> 'eventTime') as timestamp with time zone) at time zone 'america/los_angeles' event_time_pst,
  cast((event_obj -> 'event' ->> 'isError') as boolean) is_error,
  (event_obj -> 'event' -> 'metadata' ->> 'eventCategory') event_category,
  (event_obj -> 'event' -> 'metadata' ->> 'eventType') event_type,
  -- permissions
  (event_obj -> 'event' ->> 'granteeType') grantee_type,  
  cast((event_obj -> 'event' ->> 'granteeId') as integer) grantee_id,
  (event_obj -> 'event' ->> 'granteeValue') grantee_value,
  (event_obj -> 'event' ->> 'capabilityValue') capability_value,
  -- user
  cast((event_obj -> 'event' ->> 'actorUserId') as integer) actor_user_id,
  cast((event_obj -> 'event' ->> 'initiatingUserId') as integer) initiating_user_id,  
  -- content
  (event_obj -> 'event' ->> 'authorizableType') content_type,
  cast((event_obj -> 'event' ->> 'contentId') as integer) content_id,
  -- site
  (event_obj -> 'event' ->> 'siteLuid') site_luid
from cte
where 
  rnk=1 
  and (event_obj ->> 'traceUuid') not in (select trace_uuid from etl.activity_f)
  and date(cast((event_obj -> 'event' ->> 'eventTime') as timestamp with time zone) at time zone 'america/los_angeles') between '{start_date}' and '{end_date}' -- between '2023-03-01' and '2023-03-02'Albert Wong
03/03/2023, 10:45 PMAlbert Wong
03/03/2023, 10:45 PMwith open('sql/etl.activity_f.sql') as f:
        sql_query = f.read().format(**{'start_date': start_date, 'end_date': end_date})
    
    # doesn't seem to support insert into select statements
    #with SqlAlchemyConnector.load(destination_block) as connector:
    #    connector.execute(sql_query)Albert Wong
03/03/2023, 10:45 PMAlbert Wong
03/03/2023, 10:47 PMAlbert Wong
03/03/2023, 11:06 PMcursor = connection.cursor()
    cursor.execute(sql_query)
    connection.commit()    
    connection.close()
appreciate you hearing me outNate
03/04/2023, 1:15 AMDatabaseCredentials ?
something like
from prefect_sqlalchemy import DatabaseCredentials, SyncDriver
db_creds = DatabaseCredentials(
   driver=SyncDriver.POSTGRESQL_PSYCOPG2,
   ...
)Albert Wong
03/04/2023, 1:46 AMNate
03/04/2023, 1:46 AMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by