Emilio
02/10/2025, 4:46 PMprefect_url = os.getenv("PREFECT_API_URL")
client = Client(api_server=prefect_url)
Where the PREFECT_API_URL is set on Github Actions and I'm trying to register my flows into the server. I'm getting this error when trying to run the register:
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 570, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['project'], 'message': 'Missing Authorization header in JWT authentication mode', 'extensions': {'path': '$', 'code': 'invalid-headers', 'exception': {'message': 'Missing Authorization header in JWT authentication mode'}}}]
I'll paste my code in the thread here, please if someone could enlighten me on how to authenticate with a self hosted Prefect Server it would be great! Ty.Emilio
02/10/2025, 4:47 PMfrom functools import lru_cache
import os
import click
from prefect.run_configs import ECSRun
from prefect.storage import S3 as S3FlowStorage
from prefect.client import Client
from newsela.prefect.flow_registry import FLOW_REGISTRY
prefect_url = os.getenv("PREFECT_API_URL")
client = Client(api_server=prefect_url)
def get_environment_infra_prefix(environment):
"""
Since feature environments stack on top of dev, some infra we'll need to use will
include "dev" as a prefix instead of the environment name itself.
"""
if environment not in ['dev', 'staging', 'prod']:
return 'dev'
return environment
@lru_cache
def get_security_group_id(prefix):
client = boto3.client('ec2')
vpc_id = get_vpc_id(prefix)
response = client.describe_security_groups(
Filters=[
{'Name': 'vpc-id', 'Values': [vpc_id]},
{'Name': 'group-name', 'Values': [f'{prefix}-monolith-ecs']},
]
)
return response['SecurityGroups'][0]['GroupId']
@lru_cache
def get_subnet_ids(prefix):
client = boto3.client('ec2')
vpc_id = get_vpc_id(prefix)
response = client.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])
applicable_subnets = [
subnet
for subnet in response['Subnets']
if any('private' in tag['Value'] for tag in subnet['Tags'])
]
return [subnet['SubnetId'] for subnet in applicable_subnets]
@lru_cache
def get_vpc_id(prefix):
client = boto3.client('ec2')
response = client.describe_vpcs(Filters=[{'Name': 'tag:Name', 'Values': [prefix]}])
return response['Vpcs'][0]['VpcId']
def _register_flow(
environment,
flow_key,
log_level,
task_definition_arn,
security_group_id,
subnet_ids,
):
infra_prefix = get_environment_infra_prefix(environment)
flow_datum = FLOW_REGISTRY[flow_key]
flow = flow_datum['module'].flow
flow.run_config = ECSRun(
env={
'PYTHONPATH': '$PYTHONPATH:/var/task/',
'PREFECT__LOGGING__LEVEL': log_level,
},
task_definition_arn=task_definition_arn,
run_task_kwargs={
'networkConfiguration': {
'awsvpcConfiguration': {
'securityGroups': [security_group_id],
'subnets': subnet_ids,
}
},
'overrides': {
'cpu': flow_datum.get('cpu', 1024),
'memory': flow_datum.get('memory', 2048),
},
},
)
if flow_datum.get('schedules_by_environment', {}).get(environment):
flow.schedule = flow_datum['schedules_by_environment'][environment]
flow.storage = S3FlowStorage(bucket=f'newsela-{infra_prefix}-prefect-storage')
flow.storage.build()
# Register flow using explicit Prefect client
try:
flow.register(
project_name=f'monolith/{environment}', idempotency_key=flow.serialized_hash())
except Exception as e:
print(f"Failed to register flow: {flow}\n Error: {e}")
@click.command()
@click.option('--register-all/--no-register-all', default=False)
@click.option('--flow-key')
@click.option('--environment', required=True)
@click.option('--task-definition-arn')
@click.option('--log-level', required=True, default='INFO')
@click.option('--security-group-id')
@click.option('--subnet-ids')
def register(
register_all,
flow_key,
environment,
task_definition_arn,
log_level,
security_group_id,
subnet_ids,
):
if register_all and flow_key:
raise ValueError('You cannot provide both a flow key and --register-all.')
if not register_all and flow_key is None:
raise ValueError('You must furnish either --flow-key or --register-all.')
if log_level not in ['INFO', 'DEBUG', 'WARNING', 'ERROR', 'CRITICAL']:
raise ValueError('Unsupported log level.')
infra_prefix = get_environment_infra_prefix(environment)
if subnet_ids:
subnet_ids = subnet_ids.split(',')
else:
subnet_ids = get_subnet_ids(infra_prefix)
if not security_group_id:
security_group_id = get_security_group_id(infra_prefix)
if not task_definition_arn:
# Assume we're using the latest prefect task def.
task_definition_arn = f"{environment}-monolith-prefect"
flow_keys = FLOW_REGISTRY.keys() if register_all else [flow_key]
for key in flow_keys:
_register_flow(
environment,
key,
log_level,
task_definition_arn,
security_group_id,
subnet_ids,
)
if __name__ == '__main__':
register()
Emilio
02/10/2025, 5:12 PMclient.create_tenant(name="testing name",
slug="testing description")
and got 405:
Failed connecting to Prefect server: 405 Client Error: Not Allowed for url: <my-url>Emilio
02/10/2025, 5:12 PM