Jenia Varavva
11/16/2022, 4:21 PMteam1_url
and team2_api
. I wanted to use the create_flow_run() task, but since it constructs a Client()
with no args, I’m having trouble pointing it at the right API_URL. My initial instinct was to do something like:
config_team1 = prefect.context.config.copy()
config_team1.cloud.api = 'team1_url'
with Flow('parent'):
with prefect.context(config=config_team1):
create_flow_run(...)
with prefect.context(config=config_team2):
create_flow_run(...)
This, unfortunately, doesn’t work as the context is set at the flow definition time, not at task execution (i.e. not when Client()
is executed).
An alternative that got me closer was something like:
with prefect.context(config=config_team1):
flow.run()
which indeed overrides the config value for the Client()
call inside the create_flow_run()
task, but doesn’t look like it would allow me using two different urls in the same flow.
Is there a better way around this than copying/adjusting create_flow_run()
to allow overriding api_url
itself?Bianca Hoch
11/16/2022, 9:56 PMJenia Varavva
11/16/2022, 11:26 PMcreate_flow_run
task, i.e. injecting code at the beginning of its run()
method, like:
base_config = prefect.context.config
@contextlib.contextmanager
def with_api_url(api_url):
config = base_config.copy()
config.cloud.api = api_url
with prefect.context(config=config):
<http://prefect.context.logger.info|prefect.context.logger.info>("API URL: %s", prefect.context.config.cloud.api)
yield
def task_with_api_url(task, api_url):
orig_run = task.run
def new_run(*args, **kwargs):
with with_api_url(api_url):
return orig_run(*args, **kwargs)
task.run = new_run
return task
with Flow('parent') as flow:
child_flow_1 = task_with_api_url(
create_flow_run(
flow_name="flow name",
project_name=project_name,
parameters={}
),
api_urls['team1_api'],
)
Wrapping every call in this task_with_api_url
is a bit ugly, but does the job**kwargs
or client_kwargs
and passed them to the Client()
it constructs