Tushar Gupta
11/18/2022, 5:06 PMBianca Hoch
11/18/2022, 5:09 PMTushar Gupta
11/21/2022, 4:56 AMTushar Gupta
11/21/2022, 4:57 AMTushar Gupta
11/21/2022, 6:19 PMTushar Gupta
11/21/2022, 6:20 PMBianca Hoch
11/21/2022, 6:28 PMget_config
to run_config_processor
. Can you show how you are defining these tasks?Tushar Gupta
11/22/2022, 1:25 AMTushar Gupta
11/22/2022, 1:26 AMTushar Gupta
11/22/2022, 1:26 AMTushar Gupta
11/22/2022, 1:36 AMChristopher Boyd
11/22/2022, 1:53 PMreturn [source_config]
or return []
that the None
returned is correct, even if not expected?
Also, it looks like this is all being done in Prefect v1 - have you considered testing this in Prefect 2?Tushar Gupta
11/22/2022, 4:39 PMChristopher Boyd
11/22/2022, 5:06 PMTushar Gupta
11/22/2022, 6:21 PMfrom prefect import Flow, task, Parameter
from flows.Crawler.utilities.constants import (
TABLE_KEY,
TABLE_NAME,
)
from flows.Crawler.utilities.utils import load_crawler, format_error
import json
import os
from typing import List, Dict, Optional, Any, Union, Tuple, Callable
import boto3
import prefect
from prefect import Parameter, task
from prefect.run_configs import KubernetesRun, LocalRun, RunConfig
from flows.utilities.environment_settings_loader import (
EnvironmentSettingsLoader,
EnvironmentSettingEnum,
)
@task(name="run_config_processor")
def run_config_processor(config: Dict[str, Any]) -> None:
"""
Task that processes files for a given config
:param config: (Dict) Dict of some configurations from AWS dynamoDB ingestion-sources
"""
logger = prefect.context.get("logger")
try:
<http://logger.info|logger.info>(f"Running Crawler for configuration -> {config}")
except Exception as err:
error_message = format_error(err, __file__)
message = f'Error crawling "{config["name"]}" with error {error_message}'
logger.error(error_message)
def get_run_config(additional_labels: Optional[List[str]] = None) -> RunConfig:
if os.environ.get("ENV", "local") != "local":
if "-ue1" in os.environ.get(BWELL_ENV_VAR, ""):
labels = ["k8s"]
else:
labels = ["kubernetes"]
if additional_labels:
labels.extend(additional_labels)
return KubernetesRun(
labels=labels,
env={
"PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN": os.environ.get(
"PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN"
),
"S3_STORAGE_BUCKET": os.environ.get("S3_STORAGE_BUCKET"),
"ENV": os.environ.get("ENV"),
"PYTHONPATH": "/helix_orchestration",
},
image=f"{EnvironmentSettingsLoader().get(EnvironmentSettingEnum.repository_url)}/prefect.agent:latest",
)
else:
labels = ["local_flow"]
if additional_labels:
labels.extend(additional_labels)
return LocalRun(labels=labels, working_dir="/helix_orchestration")
@task(name="get_config")
def get_config(
name: Optional[str] = None, table_name: str = TABLE_NAME, table_key: str = TABLE_KEY
) -> List[Dict[str, Any]]:
"""
Get the required config from AWS DynamoDB using the primary key value for the table
:param name: Name field in the table
:param table_name: (String) Table name that needs to be accessed
:param table_key: (String) Table's primary key value
"""
configs = get_config_from_dynamodb(table_name, table_key)
source_configs = configs.get("sources", [])
if name:
for source_config in source_configs:
if name != "" and name == source_config.get("name"):
return [source_config]
return []
else:
return source_configs if isinstance(source_configs, list) else [source_configs]
def get_config_from_dynamodb(table_name: str, table_key: str) -> Dict[str, Any]:
env = os.environ.get("ENV", "local")
if env == "local":
return {}
else:
dynamodb = boto3.resource("dynamodb", region_name=AWS_REGION)
# noinspection PyUnresolvedReferences
try:
table = dynamodb.Table(table_name)
config = table.get_item(Key={"role": table_key})
return dict(config["Item"])
except botocore.exceptions.NoCredentialsError as e:
print(
f"Error reading dynamodb (NoCredentialsError): table: {table_name}, table_key={table_key} {type(e)} {e} {json.dumps(e, default=str)}"
)
return {}
except Exception as e:
print(
f"Error reading dynamodb: table: {table_name}, table_key={table_key} {type(e)} {e} {json.dumps(e, default=str)}"
)
return {}
with Flow(
"Ingestion Crawler",
run_config=get_run_config(),
) as flow:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Getting the required config from AWS DynamoDB.")
config_name = Parameter("config_name", default="")
table_name = Parameter("table_name", default="")
table_key = Parameter("table_key", default="")
configs = get_config(name=config_name, table_name=table_name, table_key=table_key)
<http://logger.info|logger.info>(f"Fetched configs from AWS DynamoDB -> {configs}")
result = run_config_processor.map(config=configs, upstream_tasks=[configs])
Tushar Gupta
11/22/2022, 6:21 PMRyan Peden
11/22/2022, 6:54 PMtable_name
and table_key
parameters or are you leaving them empty and wanting to use TABLE_NAME
and TABLE_KEY
constants?
If you want to use the constants, you may want to set those as the defaults for each parameter, i.e.
table_name = Parameter("table_name", default=TABLE_NAME)
table_key = Parameter("table_key", default=TABLE_KEY)
Also, in your exception handlers in get_config_from_dynamodb
, consider removing the return {}
lines. If either of those exceptions happen, it seems like you'd want to fail at that point since you weren't able to get the config?Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by