Hey everyone, quick question: I'm using version 0...
# ask-community
e
Hey everyone, quick question: I'm using version 0.15.13. Its self hosted on an ECS cluster. Its working fine except I can't properly connect to it. I'm trying to do this:
Copy code
prefect_url = os.getenv("PREFECT_API_URL")
client = Client(api_server=prefect_url)
Where the PREFECT_API_URL is set on Github Actions and I'm trying to register my flows into the server. I'm getting this error when trying to run the register:
Copy code
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 570, in graphql
    raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['project'], 'message': 'Missing Authorization header in JWT authentication mode', 'extensions': {'path': '$', 'code': 'invalid-headers', 'exception': {'message': 'Missing Authorization header in JWT authentication mode'}}}]
I'll paste my code in the thread here, please if someone could enlighten me on how to authenticate with a self hosted Prefect Server it would be great! Ty.
Copy code
from functools import lru_cache
import os

import click
from prefect.run_configs import ECSRun
from prefect.storage import S3 as S3FlowStorage
from prefect.client import Client

from newsela.prefect.flow_registry import FLOW_REGISTRY


prefect_url = os.getenv("PREFECT_API_URL")

client = Client(api_server=prefect_url)

def get_environment_infra_prefix(environment):
    """
    Since feature environments stack on top of dev, some infra we'll need to use will
    include "dev" as a prefix instead of the environment name itself.
    """
    if environment not in ['dev', 'staging', 'prod']:
        return 'dev'
    return environment


@lru_cache
def get_security_group_id(prefix):
    client = boto3.client('ec2')
    vpc_id = get_vpc_id(prefix)
    response = client.describe_security_groups(
        Filters=[
            {'Name': 'vpc-id', 'Values': [vpc_id]},
            {'Name': 'group-name', 'Values': [f'{prefix}-monolith-ecs']},
        ]
    )
    return response['SecurityGroups'][0]['GroupId']


@lru_cache
def get_subnet_ids(prefix):
    client = boto3.client('ec2')
    vpc_id = get_vpc_id(prefix)
    response = client.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])
    applicable_subnets = [
        subnet
        for subnet in response['Subnets']
        if any('private' in tag['Value'] for tag in subnet['Tags'])
    ]
    return [subnet['SubnetId'] for subnet in applicable_subnets]


@lru_cache
def get_vpc_id(prefix):
    client = boto3.client('ec2')
    response = client.describe_vpcs(Filters=[{'Name': 'tag:Name', 'Values': [prefix]}])
    return response['Vpcs'][0]['VpcId']


def _register_flow(
    environment,
    flow_key,
    log_level,
    task_definition_arn,
    security_group_id,
    subnet_ids,
):
    infra_prefix = get_environment_infra_prefix(environment)
    flow_datum = FLOW_REGISTRY[flow_key]
    flow = flow_datum['module'].flow
    flow.run_config = ECSRun(
        env={
            'PYTHONPATH': '$PYTHONPATH:/var/task/',
            'PREFECT__LOGGING__LEVEL': log_level,
        },
        task_definition_arn=task_definition_arn,
        run_task_kwargs={
            'networkConfiguration': {
                'awsvpcConfiguration': {
                    'securityGroups': [security_group_id],
                    'subnets': subnet_ids,
                }
            },
            'overrides': {
                'cpu': flow_datum.get('cpu', 1024),
                'memory': flow_datum.get('memory', 2048),
            },
        },
    )

    if flow_datum.get('schedules_by_environment', {}).get(environment):
        flow.schedule = flow_datum['schedules_by_environment'][environment]
    flow.storage = S3FlowStorage(bucket=f'newsela-{infra_prefix}-prefect-storage')
    flow.storage.build()

    # Register flow using explicit Prefect client
    try:
        flow.register(
            project_name=f'monolith/{environment}', idempotency_key=flow.serialized_hash())
    except Exception as e:
        print(f"Failed to register flow: {flow}\n Error: {e}")


@click.command()
@click.option('--register-all/--no-register-all', default=False)
@click.option('--flow-key')
@click.option('--environment', required=True)
@click.option('--task-definition-arn')
@click.option('--log-level', required=True, default='INFO')
@click.option('--security-group-id')
@click.option('--subnet-ids')
def register(
    register_all,
    flow_key,
    environment,
    task_definition_arn,
    log_level,
    security_group_id,
    subnet_ids,
):
    if register_all and flow_key:
        raise ValueError('You cannot provide both a flow key and --register-all.')
    if not register_all and flow_key is None:
        raise ValueError('You must furnish either --flow-key or --register-all.')
    if log_level not in ['INFO', 'DEBUG', 'WARNING', 'ERROR', 'CRITICAL']:
        raise ValueError('Unsupported log level.')

    infra_prefix = get_environment_infra_prefix(environment)
    if subnet_ids:
        subnet_ids = subnet_ids.split(',')
    else:
        subnet_ids = get_subnet_ids(infra_prefix)
    if not security_group_id:
        security_group_id = get_security_group_id(infra_prefix)
    if not task_definition_arn:
        # Assume we're using the latest prefect task def.
        task_definition_arn = f"{environment}-monolith-prefect"

    flow_keys = FLOW_REGISTRY.keys() if register_all else [flow_key]
    for key in flow_keys:
        _register_flow(
            environment,
            key,
            log_level,
            task_definition_arn,
            security_group_id,
            subnet_ids,
        )


if __name__ == '__main__':
    register()
jsut quick update, tried to create a tenant:
Copy code
client.create_tenant(name="testing name",
                              slug="testing description")
and got 405: Failed connecting to Prefect server: 405 Client Error: Not Allowed for url: <my-url>
but for some reason, whenever I try to register flows, it ALWAYS goes to localhost. Isn't it possible to deploy flows to a Prefect server backed by an ECS cluster?