Vipul
09/18/2020, 11:35 AMMatt Wong-Kemp
09/18/2020, 12:59 PMcobdate
, which container the Parameter object, not the value@task
def prepare_parameters(cobdate):
return dict(cobdate=cobdate)
with Flow("MasterFlow") as master_flow:
cobdate = Parameter("cobdate")
staging_parameters = prepare_parameters(cobdate)
staging_area = FlowRunTask(flow_name='staging_area',
parameters=staging_parameters,
project_name="SubFlow")
Vipul
09/18/2020, 1:15 PM-> res = self.graphql(create_mutation, variables=dict(input=inputs))
(Pdb) inputs
{'flow_id': 'bc156d0a-b2dc-4425-85a6-10e2a9a5ef1f', 'parameters': <Task: prepare_parameters>, 'idempotency_key': 'db6be705-45eb-4a99-adea-86c4506227d9'}
I think the JSON error is due to fact that 'parameters' has value of <Task: prepare_parameters> and JSON does not know how to serialiaze it... @nicholasnicholas
09/18/2020, 5:22 PMRun
tab here:cobdate
required or give it a default:
with Flow("MasterFlow") as master_flow:
cobdate = Parameter("cobdate", required=True)
# OR Parameter("cobdate", default="some default")
# Then pass the parameters to the runtime context of FlowRunTask
# instead of the Class instantiation
staging_area = FlowRunTask(flow_name='staging_area',
project_name="SubFlow")(parameters={"cobdate": cobdate})
business_logic_layer = FlowRunTask(flow_name='business_logic_layer',
project_name="SubFlow")
data_mart = FlowRunTask(flow_name='data_mart',
project_name="SubFlow",
wait=True)
staging_area.set_downstream(data_mart)
business_logic_layer.set_downstream(data_mart)
Vipul
09/18/2020, 5:59 PMnicholas
09/18/2020, 6:02 PMVipul
09/27/2020, 10:49 AMstaging_area = FlowRunTask(flow_name='staging_area',
project_name="SubFlow")(parameters={"cobdate": cobdate})
has cause two extra tasks to be added as List and Dict before calling Flow "staging_area", is this expected behaviour? Thanks