Mohit Singhal
12/28/2022, 11:45 AMfor table in config['tables']:
t1.fn(config['schema'],
config['ad_account'],
credentials,
block_config,
company_name,
backfill_start_date,
table)
Kelvin DeCosta
12/28/2022, 11:48 AM.fn
is for using the task
in places other than inside a prefect flow
• .submit
explicitly passes the task to the task runner
You can use .submit(param=value)
Mohit Singhal
12/28/2022, 11:51 AMKelvin DeCosta
12/28/2022, 11:55 AMtask
that calls the flow
via a flow run, and then run that task concurrently via .map
Mohit Singhal
12/28/2022, 12:16 PM@flow(task_runner=DaskTaskRunner)
def t1(params):
for table in tables:
extract_values = get_data_from.with_options(name=f"Extracting data for table {table}").submit(
params
)
reporting_date = unloading_data.with_options(name=f"Transforming data for table {table}").submit(table,company_name,extract_values)
report_date = refresh_external_table.with_options(name=f"Refreshing external table {table}").submit(table,reporting_date,credentials,block_config)
loading_data_to_replicon.with_options(name=f"Inserting data into Replicon for table {table}").submit(schemas,table,company_name,credentials,block_config,report_date)
@task()
def fac(company_name,backfill_start_date,backfill_end_date,table):
run_date = datetime.utcnow()
logger = get_run_logger()
if backfill_start_date == None and backfill_end_date == None and table == '':
t1(params)
@flow(task_runner=DaskTaskRunner)
def de_ingest(company_name,backfill_start_date : date = None,backfill_end_date : date = None,table =''):
for company_name in company_name:
fac.submit(company_name,backfill_start_date,backfill_end_date,table)
if __name__ == "__main__":
de_ingest()
Mathijs Carlu
12/28/2022, 1:25 PMPeyton Runyan
12/28/2022, 2:53 PMclient.create_flow_run_from_deployment
inside of a task to trigger flow runs from a task
https://docs.prefect.io/api-ref/prefect/client/#prefect.client.orion.OrionClient.create_flow_run_from_deployment
Note, these won't be treated as subflows