matta
08/03/2021, 4:21 AM@task(max_retries=3, retry_delay=datetime.timedelta(seconds=30))
def trigger_cloud_fn(secret: PrefectSecret, url: str, body: t.Dict = dict()):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(body)
credentials = service_account.IDTokenCredentials.from_service_account_info(
secret, target_audience=url
)
authed_session = AuthorizedSession(credentials)
response = <http://authed_session.post|authed_session.post>(url=url, json=body)
if not (isinstance(response, requests.models.Response) and response.ok):
raise signals.FAIL()
authed_session.close()
return response
Kevin Kho
08/03/2021, 5:56 PMMarvin
08/03/2021, 5:57 PM