Hi everyone, please is there a way to pass parameter from one flow to another? I'm using `client.cre...
d
Hi everyone, please is there a way to pass parameter from one flow to another? I'm using
client.create_flow_run()
to run the dependent flows but when I add parameter to it, it's not passing to the dependent flow.
k
Hey @dammy arinde, why are you using the
client.create_flow_run()
as opposed to the
create_flow_run
task?
Are you using
Client.create_flow_run()
inside the flow?
d
when I tried create_flow_run it was not finding the flow
Yes I am
k
Client.create_flow_run()
is not a task so it will execute immediately so the parameter doesn’t exist yet. You should use the
create_flow_run()
task instead so that it will defer the execution and the Parameter value will exist then. How did you call it when it wasn’t finding the flow, could you show me an example?
d
with case(cond, 'PTFileProcessingJob'):  create_flow_run(flow_id= "8d8dcd45-5965-48c0-9acc-4af4da024f66",parameters= {"data_var":s3_key})
when I used it this way, the with case would return an empty string and won't get to run the flow
k
Could you show me how
cond
is defined?
d
Copy code
@task(log_stdout=True)
def print_data(x):
    return x  

with Flow("parent-flow", run_config=RUN_CONFIG,  storage=STORAGE) as flow:
    input_data = Parameter('data_var')
    s3_key = input_data['s3_key']
    query_string = get_query_from_param(table_name=table_name)
    data = snowflake_query(
        account=account,
        user=user,
        password=pwd,
        role=role,
        warehouse=warehouse,
        query=query_string,
    )
    #print_data(data) # this returned [('PTFileProcessingJob',)] 
   
    cond = print_data(data)

    
    with case(cond, 'PTFileProcessingJob'):
        create_flow_run(flow_id= "8d8dcd45-5965-48c0-9acc-4af4da024f66",parameters= {"data_var":s3_key})
k
Did you add the return to print_data?
d
Copy code
@task(log_stdout=True)
def print_data(x):
    return x
yes, this function
k
I guess this is returning false because snowflake is returning a
List[Tuple]
and you are comparing it to a string. You likely need to convert it in the
print_data
task to make sure this is a string
d
This is the error I get
State Message: Provided value "[]" did not match "[('PTFileProcessingJob',)]"
I have tried with using it as a list[tuple] but get this error too
k
What storage are you using? Did you reregister after changing
print_data
?What happens when you log
x
in the `print_data task? Do you get
[('PTFileProcessingJob',)]
?
d
for storage, I'm using s3 bucket
yes, I reregistered
Do you get 
[('PTFileProcessingJob',)]
? Yes, I get this as the return of the print_data function
k
Am pretty confused because the state message really seems like you are getting an empty list. Could you try adding a log statement inside the print task so I can see the logs?
d
ok, thank you. is there a command to check the log?
k
Are you using Prefect Cloud? There is a tab you can check in the flow run for the flow logs
d
yes, here's the log
k
Sorry I meant inside the
print_data
so that we can explicitly see what was returned by the
snowflake_query
task
d
inside the print_data task it's just showing success but I don't see what was returned
k
No I know it ran, but we need to see the value.
Copy code
@task(log_stdout=True)
def print_data(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    return x
But even
print(x)
should work I think because you have
log_stdout=True
And then re-register and run
d
ok, thank you. let me try and show the result
when I do print(x)
I get: [2021-11-29 123036-0600] INFO - prefect.TaskRunner | [('PTFileProcessingJob',)]
k
This is very confusing. Let me try running some stuff
d
Ok, thank you
k
So if I do this:
Copy code
from prefect import Flow, task, case
import prefect

@task
def abc():
    return [('PTFileProcessingJob',)]

@task
def other_task():
    <http://prefect.context.logger.info|prefect.context.logger.info>("this ran")
    return 

with Flow("test_case") as flow:
    result = abc()
    with case(result, "PTFileProcessingJob"):
        other_task()

flow.run()
I do get
[2021-11-29 13:35:05-0500] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "[(\'PTFileProcessingJob\',)]" did not match "PTFileProcessingJob"')
I think we can just make a comparison task (will give an example)
This should be more explicit and show why the equality is not holding:
Copy code
from prefect import Flow, task, case
import prefect

@task
def abc():
    return [('PTFileProcessingJob',)]

@task
def comparison_task(x,y):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    <http://prefect.context.logger.info|prefect.context.logger.info>(y)
    if x == y:
        <http://prefect.context.logger.info|prefect.context.logger.info>("It was a match")
        return True
    else:
        <http://prefect.context.logger.info|prefect.context.logger.info>("It did not match")
        return False

@task
def other_task():
    <http://prefect.context.logger.info|prefect.context.logger.info>("this ran")
    return 

with Flow("test_case") as flow:
    result = abc()
    compare = comparison_task(result, "PTFileProcessingJob")
    with case(compare, True):
        other_task()

flow.run()
d
ok, this is not working because a string is comparing to a list right?
k
Yes but the log is weird. Shows an empty list. But yes that is right. And inside your list you also have a Tuple. I think one-element tuple will be equal to strings, but it would be better to just pull the string out to compare.
d
I think I see where the problem is, when I run locally, the log is showing the same value i have in the print_data as the same I have in the condition
but when I run it in dev, the log is showing an empty string as the result
so it seems the snowflake query is returning a result when I try locally but not returning anything in dev
k
Is your dev connecting to the same database or are you changing the conenction?
d
it's the same, but I'm accessing s3 in the dev, but not accessing it locally so the problem may be from there
thank you @Kevin Kho the problem was from the s3 bucket. I fixed it and now the case condition matches. Thank you so much!
k
No problem! Glad you figured it out!
🙏 1