Hi , is there any way to use .fn() and .submit() s...
# prefect-community
m
Hi , is there any way to use .fn() and .submit() simultaneously while calling a task function. considering we have to pass parameters as well?
Copy code
for table in config['tables']:
            t1.fn(config['schema'],
                                                                    config['ad_account'],
                                                                    credentials,
                                                                    block_config,
                                                                    company_name,
                                                                    backfill_start_date,
                                                                    table)
how can I use .submit() in the same t1 function?
k
Hey! Not sure what you mean by using both of them together. •
.fn
is for using the
task
in places other than inside a prefect
flow
.submit
explicitly passes the task to the task runner You can use
.submit(param=value)
m
what i want to do is to execute same flow parallelly three times but I am unable to do so using subflows so I thought of tweaking my code to run and do something inside task
k
Hmm, I'm not so sure, but I think you can create a
task
that calls the
flow
via a flow run, and then run that task concurrently via
.map
m
Copy code
@flow(task_runner=DaskTaskRunner)
def t1(params):
    for table in tables:
        extract_values = get_data_from.with_options(name=f"Extracting data for table {table}").submit(
                                                                        params
                                                                        )                                                                        
        reporting_date = unloading_data.with_options(name=f"Transforming data for table {table}").submit(table,company_name,extract_values)            
        report_date = refresh_external_table.with_options(name=f"Refreshing external table {table}").submit(table,reporting_date,credentials,block_config)
        loading_data_to_replicon.with_options(name=f"Inserting data into Replicon for table {table}").submit(schemas,table,company_name,credentials,block_config,report_date)



@task()
def fac(company_name,backfill_start_date,backfill_end_date,table):
    
    run_date = datetime.utcnow()
    logger = get_run_logger()
    

    if backfill_start_date == None and backfill_end_date == None and table == '':
        
        
        t1(params)
    

@flow(task_runner=DaskTaskRunner)
def de_ingest(company_name,backfill_start_date : date = None,backfill_end_date : date = None,table =''):
    
    for company_name in company_name:
        fac.submit(company_name,backfill_start_date,backfill_end_date,table)
            



if __name__ == "__main__":
    
    de_ingest()
it is giving me error saying RuntimeError: Flows cannot be run from within tasks. Did you mean to call this flow in a flow?
how can I resolve this?
I have three companies which I want to run parallelly and those companies have six tables each so I also need to run those tables parallelly for each of the company and for each table there are some task which can be executed sequentially
m
You can't call a flow from inside a task. Here it says that async subflows can be run in parallel using asyncio.gather() or anyIO task groups (which is the only way to have parallel subflows, I think).
p
You can also use
client.create_flow_run_from_deployment
inside of a task to trigger flow runs from a task https://docs.prefect.io/api-ref/prefect/client/#prefect.client.orion.OrionClient.create_flow_run_from_deployment Note, these won't be treated as subflows