https://prefect.io logo
a

Albert Wong

03/03/2023, 10:23 PM
Hey all, I just got started with prefect and like the workflow. My question, does prefect-sqlalchmy support inserto into select queries? I am trying to execute an 'insert into select columns from table' query which works when I interact with the db directly, but not through the SqlAlchemyConnector.execute(query_string) method. Wasn't sure if it wasn't supported or I was doing something wrong
s

Sean Williams

03/03/2023, 10:33 PM
Does it work if you use SQLAlchemy without Prefect? It sounds like this might be a SQLAlchemy problem, but it's tough to say without more info.
n

Nate

03/03/2023, 10:44 PM
can you share a minimal example of how you're trying to use
SqlAlchemyConnector.execute(query_string)
with your
query_string
?
a

Albert Wong

03/03/2023, 10:44 PM
sure one sec
🙏 1
Copy code
with cte
as (
select
  l.load_time at time zone 'america/los_angeles' load_time_pst,
  l.tableau_servername,
  row_number() over(partition by (event.obj ->> 'traceUuid') order by l.load_time desc) rnk,
  (event.obj) event_obj
from log.activity_log l
left outer join lateral jsonb_array_elements(l.event_data) as event(obj)
  on true
)
insert into etl.activity_f
select
  -- loading info
  load_time_pst,
  tableau_servername,
  -- event
  (event_obj ->> 'traceUuid') trace_uuid,
  cast((event_obj -> 'event' ->> 'eventTime') as timestamp with time zone) at time zone 'america/los_angeles' event_time_pst,
  cast((event_obj -> 'event' ->> 'isError') as boolean) is_error,
  (event_obj -> 'event' -> 'metadata' ->> 'eventCategory') event_category,
  (event_obj -> 'event' -> 'metadata' ->> 'eventType') event_type,
  -- permissions
  (event_obj -> 'event' ->> 'granteeType') grantee_type,  
  cast((event_obj -> 'event' ->> 'granteeId') as integer) grantee_id,
  (event_obj -> 'event' ->> 'granteeValue') grantee_value,
  (event_obj -> 'event' ->> 'capabilityValue') capability_value,
  -- user
  cast((event_obj -> 'event' ->> 'actorUserId') as integer) actor_user_id,
  cast((event_obj -> 'event' ->> 'initiatingUserId') as integer) initiating_user_id,  
  -- content
  (event_obj -> 'event' ->> 'authorizableType') content_type,
  cast((event_obj -> 'event' ->> 'contentId') as integer) content_id,
  -- site
  (event_obj -> 'event' ->> 'siteLuid') site_luid
from cte
where 
  rnk=1 
  and (event_obj ->> 'traceUuid') not in (select trace_uuid from etl.activity_f)
  and date(cast((event_obj -> 'event' ->> 'eventTime') as timestamp with time zone) at time zone 'america/los_angeles') between '{start_date}' and '{end_date}' -- between '2023-03-01' and '2023-03-02'
^^^ query string in sql/etl.activity_f.sql
Copy code
with open('sql/etl.activity_f.sql') as f:
        sql_query = f.read().format(**{'start_date': start_date, 'end_date': end_date})
    
    # doesn't seem to support insert into select statements
    #with SqlAlchemyConnector.load(destination_block) as connector:
    #    connector.execute(sql_query)
^^^ code snippet
I haven't used sqlalchemy much, so was going to try the regular library next
that didn't work, looks like it may be a sqlalchemy thing. When I use a regular postgres library psycopg2, I'm able to execute the raw sql through the methods below fine:
Copy code
cursor = connection.cursor()
    cursor.execute(sql_query)
    connection.commit()    
    connection.close()
appreciate you hearing me out
👍 1
n

Nate

03/04/2023, 1:15 AM
I wonder if it would work if you used the psycopg2 driver when you make your
DatabaseCredentials
? something like
Copy code
from prefect_sqlalchemy import DatabaseCredentials, SyncDriver

db_creds = DatabaseCredentials(
   driver=SyncDriver.POSTGRESQL_PSYCOPG2,
   ...
)
a

Albert Wong

03/04/2023, 1:46 AM
I was using the psycopg2 driver in the DatabaseCredentails, and it supported singleton inserts, just not the insert into select statement that gets me all my data in one query
n

Nate

03/04/2023, 1:46 AM
ahh i see