dammy arinde
11/23/2021, 2:14 PMdammy arinde
11/23/2021, 2:23 PM# retrieve file name from s3 and check if it matches snowflake column value to determine flow to run
@task
def get_flow_to_run(inbound_file):
global sf_conn
global inbound_key
global flow_to_run
sf_conn = sf.connect(
user='',
password='',
account='',
warehouse='',
database='',
schema=''
)
cs = sf_conn.cursor()
try:
cs.execute( """select col from table where column like '%' || '{inbound_file}' || '%' """)
job_name = cs.fetchone()
finally:
cs.close()
print(job_name)
if job_name == 'jobtype1':
flow_to_run="jobtype1flow"
elif job_name == 'jobtype2':
flow_to_run="jobtype2flow"
elif job_name == 'jobtype2':
flow_to_run="jobtype2flow"
return flow_to_run
#pass flow to run to startflowrun
@task
def debug(data):
print(data)
print(type(data))
flow_run = StartFlowRun(flow_name=flow_to_run, project_name="projectname")
with Flow("parent-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
input_data = Parameter('data_var')
s3_key = input_data['s3_key']
debug(input_data)
filename = get_inbound_file(s3_key)
flow2run = get_flow_to_run(filename)
flow_run()
dammy arinde
11/23/2021, 2:24 PMAnna Geller
Anna Geller
from prefect import Flow, Parameter, task
from prefect.tasks.snowflake import SnowflakeQuery
from prefect.tasks.secrets import PrefectSecret
snowflake_query = SnowflakeQuery(database="DEV", schema="jaffle_shop", autocommit=True)
@task(log_stdout=True)
def get_query_from_param(table_name: str = None):
return f"SELECT * FROM {table_name}"
@task(log_stdout=True)
def print_data(x):
print(x) # prints a list of tuples
with Flow("snowflake_tasks_example") as flow:
account = PrefectSecret("SNOWFLAKE_ACCOUNT_ID")
user = PrefectSecret("SNOWFLAKE_USER")
pwd = PrefectSecret("SNOWFLAKE_PASS")
role = PrefectSecret("SNOWFLAKE_ROLE")
warehouse = Parameter("SNOWFLAKE_WAREHOUSE", default="COMPUTE_WH")
table_name = Parameter("TABLE_NAME", default="customers")
query_string = get_query_from_param(table_name=table_name)
data = snowflake_query(
account=account,
user=user,
password=pwd,
role=role,
warehouse=warehouse,
query=query_string,
)
print_data(data)
if __name__ == "__main__":
flow.run()
dammy arinde
11/23/2021, 2:28 PMAnna Geller
when I tried to just test out the startflowrun with an existing flow, it returns an error that the flow is not foundI think it could be that the child flow that you try to call from a parent flow hasnât been registered? The child flow must be registered before you can call it from a parent flow
Anna Geller
dammy arinde
11/23/2021, 2:30 PMAnna Geller
dammy arinde
11/23/2021, 2:53 PMdammy arinde
11/23/2021, 2:53 PMdammy arinde
11/23/2021, 4:14 PMdammy arinde
11/23/2021, 4:15 PMdammy arinde
11/23/2021, 4:16 PMquery_string = get_query_from_param(table_name=table_name)
data = snowflake_query(
account=account,
user=user,
password=pwd,
role=role,
warehouse=warehouse,
query=query_string,
)
print_data(data)
if data[0][0] == 'PTFileProcessingJob':
flow_to_run="racspassthrough2"
elif data == [('FFileProcessingJob',)]:
flow_to_run="flowjobname"
elif data == [("XmlFileProcessingJob",)]:
flow_to_run="flowjobname"
print(flow_to_run)
create_flow_run(flow_name= flow_to_run)
dammy arinde
11/23/2021, 4:17 PMdammy arinde
11/23/2021, 4:18 PMAnna Geller
if data[0][0] == 'PTFileProcessingJob':
flow_to_run="racspassthrough2"
will work in Orion, but not in Prefect Core. You need to use case instead. Here is more information about it:
⢠case: https://docs.prefect.io/api/latest/tasks/control_flow.html#case
⢠conditional logic: https://docs.prefect.io/core/idioms/conditional.html#using-conditional-logic-in-a-flowAnna Geller
dammy arinde
11/23/2021, 4:21 PMdammy arinde
11/23/2021, 5:58 PMdammy arinde
11/23/2021, 6:00 PMprint_data(data) # this returned [('PTFileProcessingJob',)]
cond = print_data(data[0][0])
with case(cond, 'PTFileProcessingJob'):
create_flow_run(flow_name= "racspassthrough2")
Kevin Kho
print_data
task returns nothing because it just prints so case
will compare None
to PTFileProcessingJob
.
I am not sure you can do data[0][0]
also when you do print_data(data[0][0])
but it seems like that part is working. Just try making your print_data
return somethingdammy arinde
11/24/2021, 3:04 PMprint_data(data[0][0])
works locally but when I try in Dev, I get an error so I will take that off also. Thank you