https://prefect.io logo
Title
r

Robert Kowalski

03/06/2023, 3:56 PM
Hi, I trying extend flow decorator to add some defaults parameters to
flow
decorator and it is possible, but after this I have in UI ( in deployment section -> parameters ) only args and kwargs. I want see here
param
. Is there a magical way to achieve this? code:
from prefect import flow
from prefect.deployments import Deployment


def flow_wrapper(func):
    default_retries = 3

    @flow(name=func.__name__, retries=default_retries)
    def wrapper(*args, **kwarg):
        return func(*args, **kwarg)

    return wrapper


@flow_wrapper
def tmp_flow(param: int = 1):
    print(param)


if __name__ == '__main__':
    # tmp_flow(param=1)
    environment = 'prod'
    Deployment.build_from_flow(
        flow=tmp_flow,
        name=environment,
        path=f'.',
        infra_overrides={"env": {
            "PREFECT_LOGGING_LEVEL": "DEBUG",
        }},
        work_queue_name=environment,
        apply=True,
        work_pool_name=environment
    )
image.png
c

Christopher Boyd

03/06/2023, 4:06 PM
you need to build the schema
parameter_openapi_schema - https://docs.prefect.io/api-ref/prefect/deployments/ An example looks like:
schema = {
    'title': 'Parameters',
    'type': 'object',
    'properties': {
        'callback_url': {
            'type': 'string',
            'title': 'callback_url'
        },
        'execution_id': {
            'type': 'string',
            'title': 'execution_id'
        },
        'step_id': {
            'type': 'string',
            'title': 'step_id'
        },
        'skip_hooks': {
            'type': 'boolean',
            'title': 'skip_hooks'
            'default': 'True'
        }
    }
}
r

Robert Kowalski

03/06/2023, 4:33 PM
It don't work for me or I doing something wrong
parameter_openapi_schema={
            'title': 'Parameters',
            'type': 'object',
            'properties': {
                'param': {
                    'type': 'integer',
                    'title': 'param',
                    # 'position': 0
                }
            },
            #'required': ['param']
        },
any change in UI
Can someone confirm that parameter
parameter_openapi_schema
works correctly ?
c

Christopher Boyd

03/08/2023, 1:12 PM
yes, I’ve used it successfully , I can share an example
schema = {
    'title': 'Parameters',
    'type': 'object',
    'properties': {
        'callback_url': {
            'type': 'string',
            'title': 'callback_url'
        },
        'trigger_source': {
            'type': 'string',
            'title': 'trigger_source'
        },
        'external_id': {
            'type': 'string',
            'title': 'external_id'
        },
        'debug': {
            'type': 'boolean',
            'title': 'debug',
            'default': 'False'
        },
        'gcp_logging_enabled': {
            'type': 'boolean',
            'title': 'gcp_logging_enabled',
            'default': 'False'
        }
    }
}

deployment = Deployment(
 ...
  parameter_openapi_schema=schema
)
elaborating the format,
external_id
is the literal parameter I would use / pass to the flow, the type of it is string (types must match the valid openapi typings), and the displayed title in the UI would be
external_id
. You can set defaults, required, etc. if you want, but this should help get you started, and I do use this actively for a few flows so I can confirm it works
r

Robert Kowalski

03/08/2023, 2:13 PM
Your code works if i use
flow
decorator but if i use my
flow_wrapper
decorator to setup retries defaults and flow name then in UI i have args and kwargs still. So this solution dont work in my case Here is full code:
from prefect import flow
from prefect.deployments import Deployment


def flow_wrapper(func):
    default_retries = 3

    @flow(name=func.__name__, retries=default_retries)
    def wrapper(*args, **kwarg):
        return func(*args, **kwarg)

    return wrapper


@flow(name='tmp_flow')
# @flow_wrapper
def tmp_flow(callback_url: str,
             trigger_source: str,
             external_id: str,
             debug: bool = False,
             gcp_logging_enabled: bool = False):
    print(callback_url)


if __name__ == '__main__':
    schema = {
        'title': 'Parameters',
        'type': 'object',
        'properties': {
            'callback_url': {
                'type': 'string',
                'title': 'callback_url'
            },
            'trigger_source': {
                'type': 'string',
                'title': 'trigger_source'
            },
            'external_id': {
                'type': 'string',
                'title': 'external_id'
            },
            'debug': {
                'type': 'boolean',
                'title': 'debug',
                'default': 'False'
            },
            'gcp_logging_enabled': {
                'type': 'boolean',
                'title': 'gcp_logging_enabled',
                'default': 'False'
            }
        }
    }

    environment = 'prod'
    Deployment.build_from_flow(
        flow=tmp_flow,
        output='tmp_flow.yml',
        name=environment,
        path=f'.',
        infra_overrides={"env": {
            "PREFECT_LOGGING_LEVEL": "DEBUG",
        }},
        work_queue_name=environment,
        apply=True,
        is_schedule_active=False,
        work_pool_name=environment,
        parameter_openapi_schema=schema
    )