Hello, please I am having some issues getting my p...
# ask-community
d
Hello, please I am having some issues getting my prefect flow to run. I am trying to create a parent flow to run other flows based on a conditional value that will be derived from the file dropped in s3 and the column it matches in a snowflake table. I created a task that extracts the column value we are looking for from the snowflake table, then used if statements to assign the flow to run based on the value extracted from the table. I then return the flow to run, so I can pass it as the flow name in the startflowrun function. it seems I'm unable to pass the flow to run as it always returns not defined. also when I tried to just test out the startflowrun with an existing flow, it returns an error that the flow is not found. Please is there a way to get this job name and pass it as the job name for startflowrun to run?
Copy code
# retrieve file name from s3 and check if it matches snowflake column value to determine flow to run
@task
def get_flow_to_run(inbound_file):
    global sf_conn
    global inbound_key
    global flow_to_run
    sf_conn = sf.connect(
        user='',
        password='',
        account='',
        warehouse='',
        database='',
        schema=''
    )
    cs = sf_conn.cursor()
    try:
        cs.execute( """select col from table where column like '%' || '{inbound_file}' || '%' """)
        job_name = cs.fetchone()
    finally:
        cs.close()
    print(job_name)
    if job_name == 'jobtype1':
        flow_to_run="jobtype1flow"

    elif job_name == 'jobtype2':
        flow_to_run="jobtype2flow"

    elif job_name == 'jobtype2':
        flow_to_run="jobtype2flow"
   
    return flow_to_run

#pass flow to run to startflowrun



@task
def debug(data):
    print(data)
    print(type(data))

flow_run = StartFlowRun(flow_name=flow_to_run, project_name="projectname")
with Flow("parent-flow", run_config=RUN_CONFIG,  storage=STORAGE) as flow:
    input_data = Parameter('data_var')
    s3_key = input_data['s3_key']
    debug(input_data)
    
    filename = get_inbound_file(s3_key)
    flow2run = get_flow_to_run(filename)
    flow_run()
@Anna Geller thank you, just did.
a
thank you for moving the code block to the thread 🙏 I have a couple of ideas of what you could try: 1. Prefect tasks are considered to be stateless. I see you try defining some global variables inside of a task. I think it will have some unintended consequences. 2. Did you know that there is a Prefect task for Snowflake queries? I can share how it could be used
The docs for the task: https://docs.prefect.io/api/latest/tasks/snowflake.html#snowflakequery And here is one example of how you could use it in a flow:
Copy code
from prefect import Flow, Parameter, task
from prefect.tasks.snowflake import SnowflakeQuery
from prefect.tasks.secrets import PrefectSecret


snowflake_query = SnowflakeQuery(database="DEV", schema="jaffle_shop", autocommit=True)


@task(log_stdout=True)
def get_query_from_param(table_name: str = None):
    return f"SELECT * FROM {table_name}"


@task(log_stdout=True)
def print_data(x):
    print(x)  # prints a list of tuples


with Flow("snowflake_tasks_example") as flow:
    account = PrefectSecret("SNOWFLAKE_ACCOUNT_ID")
    user = PrefectSecret("SNOWFLAKE_USER")
    pwd = PrefectSecret("SNOWFLAKE_PASS")
    role = PrefectSecret("SNOWFLAKE_ROLE")
    warehouse = Parameter("SNOWFLAKE_WAREHOUSE", default="COMPUTE_WH")
    table_name = Parameter("TABLE_NAME", default="customers")
    query_string = get_query_from_param(table_name=table_name)
    data = snowflake_query(
        account=account,
        user=user,
        password=pwd,
        role=role,
        warehouse=warehouse,
        query=query_string,
    )
    print_data(data)

if __name__ == "__main__":
    flow.run()
d
Thank you, I will try this and remove the global variables also. Thank you
a
Regarding:
when I tried to just test out the startflowrun with an existing flow, it returns an error that the flow is not found
I think it could be that the child flow that you try to call from a parent flow hasn’t been registered? The child flow must be registered before you can call it from a parent flow
We’ve recently published a tutorial about orchestrating a flow of flows, perhaps it can help you set up this pattern: https://www.prefect.io/blog/flow-of-flows-orchestrating-elt-with-prefect-and-dbt/ There is also this post that introduced those tasks: https://www.prefect.io/blog/prefect-0-15-0-a-new-flow-run-experience/
d
yes, it has been registered. It ran on it's own but getting that error when I tried with the parent flow
a
you can definitely have a look at the “create_flow_run” task. But StartFlowRun should work as well. The problem that I think you ran into is indentation 😄 you flow.run() should be defined outside of the with Flow() block. So unindent this last line and you should be good to go 👍
d
oh
thank you! Let me try to unindent and see if that works!
👍 1
I get the error that I'm trying to execute a task outside of flow after unindenting
so I tried to use the snowflake example, and try using the create_flow_run
Copy code
query_string = get_query_from_param(table_name=table_name)
    data = snowflake_query(
        account=account,
        user=user,
        password=pwd,
        role=role,
        warehouse=warehouse,
        query=query_string,
    )
    print_data(data)
    

    if data[0][0] == 'PTFileProcessingJob':
        flow_to_run="racspassthrough2"

    elif data == [('FFileProcessingJob',)]:
        flow_to_run="flowjobname"

    elif data == [("XmlFileProcessingJob",)]:
        flow_to_run="flowjobname"
    print(flow_to_run)
    create_flow_run(flow_name= flow_to_run)
I'm able to get the data now, but it won't let me use it to determine the flow to run
I'm using the if statements to get the flow to run, and putting the flow_to_run variable as the flow name but it's giving me flow_to_run is not defined
a
This conditional logic:
Copy code
if data[0][0] == 'PTFileProcessingJob':
        flow_to_run="racspassthrough2"
will work in Orion, but not in Prefect Core. You need to use case instead. Here is more information about it: • case: https://docs.prefect.io/api/latest/tasks/control_flow.html#case • conditional logic: https://docs.prefect.io/core/idioms/conditional.html#using-conditional-logic-in-a-flow
@dammy arinde great work so far, I think you’re really close to get it working. Just make sure to replace the if statements with the case logic.
d
ok, thank you so much, I will try with the case logic
Hi @Anna Geller thank you so much for all your help! I am now using case, and I am receiving a new error now that "task 'case(PTFileProcessingJob)': Starting task run... [2021-11-23 115511-0600] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "None" did not match "PTFileProcessingJob"')"
Copy code
print_data(data) # this returned [('PTFileProcessingJob',)]
    
   
    cond =  print_data(data[0][0])

    with case(cond, 'PTFileProcessingJob'):
         create_flow_run(flow_name= "racspassthrough2")
k
I don’t think that’s an error. The
print_data
task returns nothing because it just prints so
case
will compare
None
to
PTFileProcessingJob
. I am not sure you can do
data[0][0]
also when you do
print_data(data[0][0])
but it seems like that part is working. Just try making your
print_data
return something
upvote 1
d
Yes, you are right I changed the print function to return something. Thank you.
print_data(data[0][0])
works locally but when I try in Dev, I get an error so I will take that off also. Thank you
👍 2