https://prefect.io logo
Title
v

Vipul

09/18/2020, 11:35 AM
Sorry, pressed enter too early. The issue I am seeing is that • From UI, when I run MasterFlow, it does not give me an option to populate cobdate
m

Matt Wong-Kemp

09/18/2020, 12:59 PM
You need to pass the parameter as a runtime option, I think - at the minute you're creating a dict with one field,
cobdate
, which container the Parameter object, not the value
:upvote: 1
@task
def prepare_parameters(cobdate):
    return dict(cobdate=cobdate)

with Flow("MasterFlow") as master_flow:
    cobdate = Parameter("cobdate")
    staging_parameters = prepare_parameters(cobdate)
    staging_area = FlowRunTask(flow_name='staging_area',
                               parameters=staging_parameters,
                               project_name="SubFlow")
v

Vipul

09/18/2020, 1:15 PM
Thanks for quick reply.You mean I need to create a new task to pass this parameter to FlowRunTask? Just wondering why can't it directly get it from Parameter as I thought Parameter value can be updated runtime?
@Matt Wong-Kemp Also, if I make change as suggested by you, I am getting the following error: File "d:\1. vipul\1. tech\python\prefect\venv\lib\site-packages\prefect\client\client.py", line 965, in create_flow_run res = self.graphql(create_mutation, variables=dict(input=inputs)) File "d:\1. vipul\1. tech\python\prefect\venv\lib\site-packages\prefect\client\client.py", line 280, in graphql params=dict(query=parse_graphql(query), variables=json.dumps(variables)), File "C:\Program Files (x86)\Python\lib\json\__init__.py", line 231, in dumps return _default_encoder.encode(obj) File "C:\Program Files (x86)\Python\lib\json\encoder.py", line 199, in encode chunks = self.iterencode(o, _one_shot=True) File "C:\Program Files (x86)\Python\lib\json\encoder.py", line 257, in iterencode return _iterencode(o, 0) File "C:\Program Files (x86)\Python\lib\json\encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' TypeError: Object of type FunctionTask is not JSON serializable
-> res = self.graphql(create_mutation, variables=dict(input=inputs))
(Pdb) inputs
{'flow_id': 'bc156d0a-b2dc-4425-85a6-10e2a9a5ef1f', 'parameters': <Task: prepare_parameters>, 'idempotency_key': 'db6be705-45eb-4a99-adea-86c4506227d9'}
I think the JSON error is due to fact that 'parameters' has value of <Task: prepare_parameters> and JSON does not know how to serialiaze it... @nicholas
n

nicholas

09/18/2020, 5:22 PM
@Vipul - when running the flow from the UI, you'll need to navigate to the
Run
tab here:
And then in your original flow you can make
cobdate
required or give it a default:
with Flow("MasterFlow") as master_flow:
    cobdate = Parameter("cobdate", required=True)
    # OR      Parameter("cobdate", default="some default")

    # Then pass the parameters to the runtime context of FlowRunTask
    # instead of the Class instantiation
    staging_area = FlowRunTask(flow_name='staging_area',
                               project_name="SubFlow")(parameters={"cobdate": cobdate})

    business_logic_layer = FlowRunTask(flow_name='business_logic_layer',
                                       project_name="SubFlow")
    data_mart = FlowRunTask(flow_name='data_mart',
                            project_name="SubFlow",
                            wait=True)
    staging_area.set_downstream(data_mart)
    business_logic_layer.set_downstream(data_mart)
v

Vipul

09/18/2020, 5:59 PM
Wonderful @nicholas that worked, thanks for your help and @Matt Wong-Kemp
šŸš€ 2
n

nicholas

09/18/2020, 6:02 PM
Great !
v

Vipul

09/27/2020, 10:49 AM
Hey @nicholas it worked based on your approach but what I have seen from the schematic is that adding the line
staging_area = FlowRunTask(flow_name='staging_area',
                               project_name="SubFlow")(parameters={"cobdate": cobdate})
has cause two extra tasks to be added as List and Dict before calling Flow "staging_area", is this expected behaviour? Thanks