Robert Kowalski
03/06/2023, 3:56 PMflow
decorator and it is possible, but after this I have in UI ( in deployment section -> parameters ) only args and kwargs. I want see here param
. Is there a magical way to achieve this?
code:
from prefect import flow
from prefect.deployments import Deployment
def flow_wrapper(func):
default_retries = 3
@flow(name=func.__name__, retries=default_retries)
def wrapper(*args, **kwarg):
return func(*args, **kwarg)
return wrapper
@flow_wrapper
def tmp_flow(param: int = 1):
print(param)
if __name__ == '__main__':
# tmp_flow(param=1)
environment = 'prod'
Deployment.build_from_flow(
flow=tmp_flow,
name=environment,
path=f'.',
infra_overrides={"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
}},
work_queue_name=environment,
apply=True,
work_pool_name=environment
)
Christopher Boyd
03/06/2023, 4:06 PMschema = {
'title': 'Parameters',
'type': 'object',
'properties': {
'callback_url': {
'type': 'string',
'title': 'callback_url'
},
'execution_id': {
'type': 'string',
'title': 'execution_id'
},
'step_id': {
'type': 'string',
'title': 'step_id'
},
'skip_hooks': {
'type': 'boolean',
'title': 'skip_hooks'
'default': 'True'
}
}
}
Robert Kowalski
03/06/2023, 4:33 PMparameter_openapi_schema={
'title': 'Parameters',
'type': 'object',
'properties': {
'param': {
'type': 'integer',
'title': 'param',
# 'position': 0
}
},
#'required': ['param']
},
parameter_openapi_schema
works correctly ?Christopher Boyd
03/08/2023, 1:12 PMschema = {
'title': 'Parameters',
'type': 'object',
'properties': {
'callback_url': {
'type': 'string',
'title': 'callback_url'
},
'trigger_source': {
'type': 'string',
'title': 'trigger_source'
},
'external_id': {
'type': 'string',
'title': 'external_id'
},
'debug': {
'type': 'boolean',
'title': 'debug',
'default': 'False'
},
'gcp_logging_enabled': {
'type': 'boolean',
'title': 'gcp_logging_enabled',
'default': 'False'
}
}
}
deployment = Deployment(
...
parameter_openapi_schema=schema
)
external_id
is the literal parameter I would use / pass to the flow, the type of it is string (types must match the valid openapi typings), and the displayed title in the UI would be external_id
. You can set defaults, required, etc. if you want, but this should help get you started, and I do use this actively for a few flows so I can confirm it worksRobert Kowalski
03/08/2023, 2:13 PMflow
decorator but if i use my flow_wrapper
decorator to setup retries defaults and flow name then in UI i have args and kwargs still. So this solution dont work in my case
Here is full code:
from prefect import flow
from prefect.deployments import Deployment
def flow_wrapper(func):
default_retries = 3
@flow(name=func.__name__, retries=default_retries)
def wrapper(*args, **kwarg):
return func(*args, **kwarg)
return wrapper
@flow(name='tmp_flow')
# @flow_wrapper
def tmp_flow(callback_url: str,
trigger_source: str,
external_id: str,
debug: bool = False,
gcp_logging_enabled: bool = False):
print(callback_url)
if __name__ == '__main__':
schema = {
'title': 'Parameters',
'type': 'object',
'properties': {
'callback_url': {
'type': 'string',
'title': 'callback_url'
},
'trigger_source': {
'type': 'string',
'title': 'trigger_source'
},
'external_id': {
'type': 'string',
'title': 'external_id'
},
'debug': {
'type': 'boolean',
'title': 'debug',
'default': 'False'
},
'gcp_logging_enabled': {
'type': 'boolean',
'title': 'gcp_logging_enabled',
'default': 'False'
}
}
}
environment = 'prod'
Deployment.build_from_flow(
flow=tmp_flow,
output='tmp_flow.yml',
name=environment,
path=f'.',
infra_overrides={"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
}},
work_queue_name=environment,
apply=True,
is_schedule_active=False,
work_pool_name=environment,
parameter_openapi_schema=schema
)