dammy arinde
11/29/2021, 5:33 PMclient.create_flow_run()Kevin Kho
client.create_flow_run()create_flow_runKevin Kho
Client.create_flow_run()dammy arinde
11/29/2021, 5:35 PMdammy arinde
11/29/2021, 5:35 PMKevin Kho
Client.create_flow_run()create_flow_run()dammy arinde
11/29/2021, 5:43 PMwith case(cond, 'PTFileProcessingJob'):  create_flow_run(flow_id= "8d8dcd45-5965-48c0-9acc-4af4da024f66",parameters= {"data_var":s3_key})dammy arinde
11/29/2021, 5:44 PMKevin Kho
conddammy arinde
11/29/2021, 5:47 PM@task(log_stdout=True)
def print_data(x):
    return x  
with Flow("parent-flow", run_config=RUN_CONFIG,  storage=STORAGE) as flow:
    input_data = Parameter('data_var')
    s3_key = input_data['s3_key']
    query_string = get_query_from_param(table_name=table_name)
    data = snowflake_query(
        account=account,
        user=user,
        password=pwd,
        role=role,
        warehouse=warehouse,
        query=query_string,
    )
    #print_data(data) # this returned [('PTFileProcessingJob',)] 
   
    cond = print_data(data)
    
    with case(cond, 'PTFileProcessingJob'):
        create_flow_run(flow_id= "8d8dcd45-5965-48c0-9acc-4af4da024f66",parameters= {"data_var":s3_key})Kevin Kho
dammy arinde
11/29/2021, 5:51 PM@task(log_stdout=True)
def print_data(x):
    return xdammy arinde
11/29/2021, 5:51 PMKevin Kho
List[Tuple]print_datadammy arinde
11/29/2021, 5:57 PMdammy arinde
11/29/2021, 5:57 PMdammy arinde
11/29/2021, 5:58 PMKevin Kho
print_datax[('PTFileProcessingJob',)]dammy arinde
11/29/2021, 6:02 PMdammy arinde
11/29/2021, 6:02 PMdammy arinde
11/29/2021, 6:11 PM[('PTFileProcessingJob',)]Kevin Kho
dammy arinde
11/29/2021, 6:21 PMKevin Kho
dammy arinde
11/29/2021, 6:24 PMdammy arinde
11/29/2021, 6:24 PMKevin Kho
print_datasnowflake_querydammy arinde
11/29/2021, 6:27 PMdammy arinde
11/29/2021, 6:27 PMKevin Kho
@task(log_stdout=True)
def print_data(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    return xKevin Kho
print(x)log_stdout=TrueKevin Kho
dammy arinde
11/29/2021, 6:29 PMdammy arinde
11/29/2021, 6:30 PMdammy arinde
11/29/2021, 6:30 PMKevin Kho
dammy arinde
11/29/2021, 6:34 PMKevin Kho
from prefect import Flow, task, case
import prefect
@task
def abc():
    return [('PTFileProcessingJob',)]
@task
def other_task():
    <http://prefect.context.logger.info|prefect.context.logger.info>("this ran")
    return 
with Flow("test_case") as flow:
    result = abc()
    with case(result, "PTFileProcessingJob"):
        other_task()
flow.run()[2021-11-29 13:35:05-0500] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "[(\'PTFileProcessingJob\',)]" did not match "PTFileProcessingJob"')Kevin Kho
Kevin Kho
from prefect import Flow, task, case
import prefect
@task
def abc():
    return [('PTFileProcessingJob',)]
@task
def comparison_task(x,y):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    <http://prefect.context.logger.info|prefect.context.logger.info>(y)
    if x == y:
        <http://prefect.context.logger.info|prefect.context.logger.info>("It was a match")
        return True
    else:
        <http://prefect.context.logger.info|prefect.context.logger.info>("It did not match")
        return False
@task
def other_task():
    <http://prefect.context.logger.info|prefect.context.logger.info>("this ran")
    return 
with Flow("test_case") as flow:
    result = abc()
    compare = comparison_task(result, "PTFileProcessingJob")
    with case(compare, True):
        other_task()
flow.run()dammy arinde
11/29/2021, 6:41 PMKevin Kho
dammy arinde
11/29/2021, 6:46 PMdammy arinde
11/29/2021, 6:46 PMdammy arinde
11/29/2021, 6:47 PMdammy arinde
11/29/2021, 6:48 PMKevin Kho
dammy arinde
11/29/2021, 6:50 PMdammy arinde
11/29/2021, 10:40 PMKevin Kho
