dammy arinde
11/29/2021, 5:33 PMclient.create_flow_run()
to run the dependent flows but when I add parameter to it, it's not passing to the dependent flow.Kevin Kho
client.create_flow_run()
as opposed to the create_flow_run
task?Kevin Kho
Client.create_flow_run()
inside the flow?dammy arinde
11/29/2021, 5:35 PMdammy arinde
11/29/2021, 5:35 PMKevin Kho
Client.create_flow_run()
is not a task so it will execute immediately so the parameter doesn’t exist yet. You should use the create_flow_run()
task instead so that it will defer the execution and the Parameter value will exist then. How did you call it when it wasn’t finding the flow, could you show me an example?dammy arinde
11/29/2021, 5:43 PMwith case(cond, 'PTFileProcessingJob'): create_flow_run(flow_id= "8d8dcd45-5965-48c0-9acc-4af4da024f66",parameters= {"data_var":s3_key})
dammy arinde
11/29/2021, 5:44 PMKevin Kho
cond
is defined?dammy arinde
11/29/2021, 5:47 PM@task(log_stdout=True)
def print_data(x):
return x
with Flow("parent-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
input_data = Parameter('data_var')
s3_key = input_data['s3_key']
query_string = get_query_from_param(table_name=table_name)
data = snowflake_query(
account=account,
user=user,
password=pwd,
role=role,
warehouse=warehouse,
query=query_string,
)
#print_data(data) # this returned [('PTFileProcessingJob',)]
cond = print_data(data)
with case(cond, 'PTFileProcessingJob'):
create_flow_run(flow_id= "8d8dcd45-5965-48c0-9acc-4af4da024f66",parameters= {"data_var":s3_key})
Kevin Kho
dammy arinde
11/29/2021, 5:51 PM@task(log_stdout=True)
def print_data(x):
return x
dammy arinde
11/29/2021, 5:51 PMKevin Kho
List[Tuple]
and you are comparing it to a string. You likely need to convert it in the print_data
task to make sure this is a stringdammy arinde
11/29/2021, 5:57 PMdammy arinde
11/29/2021, 5:57 PMdammy arinde
11/29/2021, 5:58 PMKevin Kho
print_data
?What happens when you log x
in the `print_data task? Do you get [('PTFileProcessingJob',)]
?dammy arinde
11/29/2021, 6:02 PMdammy arinde
11/29/2021, 6:02 PMdammy arinde
11/29/2021, 6:11 PM[('PTFileProcessingJob',)]
? Yes, I get this as the return of the print_data functionKevin Kho
dammy arinde
11/29/2021, 6:21 PMKevin Kho
dammy arinde
11/29/2021, 6:24 PMdammy arinde
11/29/2021, 6:24 PMKevin Kho
print_data
so that we can explicitly see what was returned by the snowflake_query
taskdammy arinde
11/29/2021, 6:27 PMdammy arinde
11/29/2021, 6:27 PMKevin Kho
@task(log_stdout=True)
def print_data(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
return x
Kevin Kho
print(x)
should work I think because you have log_stdout=True
Kevin Kho
dammy arinde
11/29/2021, 6:29 PMdammy arinde
11/29/2021, 6:30 PMdammy arinde
11/29/2021, 6:30 PMKevin Kho
dammy arinde
11/29/2021, 6:34 PMKevin Kho
from prefect import Flow, task, case
import prefect
@task
def abc():
return [('PTFileProcessingJob',)]
@task
def other_task():
<http://prefect.context.logger.info|prefect.context.logger.info>("this ran")
return
with Flow("test_case") as flow:
result = abc()
with case(result, "PTFileProcessingJob"):
other_task()
flow.run()
I do get [2021-11-29 13:35:05-0500] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "[(\'PTFileProcessingJob\',)]" did not match "PTFileProcessingJob"')
Kevin Kho
Kevin Kho
from prefect import Flow, task, case
import prefect
@task
def abc():
return [('PTFileProcessingJob',)]
@task
def comparison_task(x,y):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
<http://prefect.context.logger.info|prefect.context.logger.info>(y)
if x == y:
<http://prefect.context.logger.info|prefect.context.logger.info>("It was a match")
return True
else:
<http://prefect.context.logger.info|prefect.context.logger.info>("It did not match")
return False
@task
def other_task():
<http://prefect.context.logger.info|prefect.context.logger.info>("this ran")
return
with Flow("test_case") as flow:
result = abc()
compare = comparison_task(result, "PTFileProcessingJob")
with case(compare, True):
other_task()
flow.run()
dammy arinde
11/29/2021, 6:41 PMKevin Kho
dammy arinde
11/29/2021, 6:46 PMdammy arinde
11/29/2021, 6:46 PMdammy arinde
11/29/2021, 6:47 PMdammy arinde
11/29/2021, 6:48 PMKevin Kho
dammy arinde
11/29/2021, 6:50 PMdammy arinde
11/29/2021, 10:40 PMKevin Kho