Tushar Gupta
11/18/2022, 5:06 PMBianca Hoch
11/18/2022, 5:09 PMTushar Gupta
11/21/2022, 4:56 AMBianca Hoch
11/21/2022, 6:28 PMget_config
to run_config_processor
. Can you show how you are defining these tasks?Tushar Gupta
11/22/2022, 1:25 AMChristopher Boyd
11/22/2022, 1:53 PMreturn [source_config]
or return []
that the None
returned is correct, even if not expected?
Also, it looks like this is all being done in Prefect v1 - have you considered testing this in Prefect 2?Tushar Gupta
11/22/2022, 4:39 PMChristopher Boyd
11/22/2022, 5:06 PMTushar Gupta
11/22/2022, 6:21 PMfrom prefect import Flow, task, Parameter
from flows.Crawler.utilities.constants import (
TABLE_KEY,
TABLE_NAME,
)
from flows.Crawler.utilities.utils import load_crawler, format_error
import json
import os
from typing import List, Dict, Optional, Any, Union, Tuple, Callable
import boto3
import prefect
from prefect import Parameter, task
from prefect.run_configs import KubernetesRun, LocalRun, RunConfig
from flows.utilities.environment_settings_loader import (
EnvironmentSettingsLoader,
EnvironmentSettingEnum,
)
@task(name="run_config_processor")
def run_config_processor(config: Dict[str, Any]) -> None:
"""
Task that processes files for a given config
:param config: (Dict) Dict of some configurations from AWS dynamoDB ingestion-sources
"""
logger = prefect.context.get("logger")
try:
<http://logger.info|logger.info>(f"Running Crawler for configuration -> {config}")
except Exception as err:
error_message = format_error(err, __file__)
message = f'Error crawling "{config["name"]}" with error {error_message}'
logger.error(error_message)
def get_run_config(additional_labels: Optional[List[str]] = None) -> RunConfig:
if os.environ.get("ENV", "local") != "local":
if "-ue1" in os.environ.get(BWELL_ENV_VAR, ""):
labels = ["k8s"]
else:
labels = ["kubernetes"]
if additional_labels:
labels.extend(additional_labels)
return KubernetesRun(
labels=labels,
env={
"PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN": os.environ.get(
"PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN"
),
"S3_STORAGE_BUCKET": os.environ.get("S3_STORAGE_BUCKET"),
"ENV": os.environ.get("ENV"),
"PYTHONPATH": "/helix_orchestration",
},
image=f"{EnvironmentSettingsLoader().get(EnvironmentSettingEnum.repository_url)}/prefect.agent:latest",
)
else:
labels = ["local_flow"]
if additional_labels:
labels.extend(additional_labels)
return LocalRun(labels=labels, working_dir="/helix_orchestration")
@task(name="get_config")
def get_config(
name: Optional[str] = None, table_name: str = TABLE_NAME, table_key: str = TABLE_KEY
) -> List[Dict[str, Any]]:
"""
Get the required config from AWS DynamoDB using the primary key value for the table
:param name: Name field in the table
:param table_name: (String) Table name that needs to be accessed
:param table_key: (String) Table's primary key value
"""
configs = get_config_from_dynamodb(table_name, table_key)
source_configs = configs.get("sources", [])
if name:
for source_config in source_configs:
if name != "" and name == source_config.get("name"):
return [source_config]
return []
else:
return source_configs if isinstance(source_configs, list) else [source_configs]
def get_config_from_dynamodb(table_name: str, table_key: str) -> Dict[str, Any]:
env = os.environ.get("ENV", "local")
if env == "local":
return {}
else:
dynamodb = boto3.resource("dynamodb", region_name=AWS_REGION)
# noinspection PyUnresolvedReferences
try:
table = dynamodb.Table(table_name)
config = table.get_item(Key={"role": table_key})
return dict(config["Item"])
except botocore.exceptions.NoCredentialsError as e:
print(
f"Error reading dynamodb (NoCredentialsError): table: {table_name}, table_key={table_key} {type(e)} {e} {json.dumps(e, default=str)}"
)
return {}
except Exception as e:
print(
f"Error reading dynamodb: table: {table_name}, table_key={table_key} {type(e)} {e} {json.dumps(e, default=str)}"
)
return {}
with Flow(
"Ingestion Crawler",
run_config=get_run_config(),
) as flow:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Getting the required config from AWS DynamoDB.")
config_name = Parameter("config_name", default="")
table_name = Parameter("table_name", default="")
table_key = Parameter("table_key", default="")
configs = get_config(name=config_name, table_name=table_name, table_key=table_key)
<http://logger.info|logger.info>(f"Fetched configs from AWS DynamoDB -> {configs}")
result = run_config_processor.map(config=configs, upstream_tasks=[configs])
Ryan Peden
11/22/2022, 6:54 PMtable_name
and table_key
parameters or are you leaving them empty and wanting to use TABLE_NAME
and TABLE_KEY
constants?
If you want to use the constants, you may want to set those as the defaults for each parameter, i.e.
table_name = Parameter("table_name", default=TABLE_NAME)
table_key = Parameter("table_key", default=TABLE_KEY)
Also, in your exception handlers in get_config_from_dynamodb
, consider removing the return {}
lines. If either of those exceptions happen, it seems like you'd want to fail at that point since you weren't able to get the config?