https://prefect.io logo
Title
t

Tushar Gupta

11/18/2022, 5:06 PM
b

Bianca Hoch

11/18/2022, 5:09 PM
Hi Tushar, thanks for reaching out. Can you share a minimal reproducible example of the code?
t

Tushar Gupta

11/21/2022, 4:56 AM
Thanks for replying @Bianca Hoch Here is the code
Taking parameters then I am passing these to get_config which is a task then I want to run my run_config_processor only after getting the configs
@Bianca Hoch Good Afternoon! Can you please help? Before making get_config function a task it was showing the logs but now the logs are also not showing up
My get_config function is returning none when I made it prefect task.Otherwise it is returning config when it is not a prefect task.
b

Bianca Hoch

11/21/2022, 6:28 PM
Hey Tushar, there shouldn't be any issues with how you have your flow structured. You should be able to pass data from
get_config
to
run_config_processor
. Can you show how you are defining these tasks?
t

Tushar Gupta

11/22/2022, 1:25 AM
Here is get_config which is working fine if it is not a prefect task.
2nd task run_config_processor
@Bianca Hoch @Mason Menges @Christopher Boyd Thanks!
get_config_from_dynamodb function
c

Christopher Boyd

11/22/2022, 1:53 PM
Have you tried debugging / printing the components / parameters into get_config and the response from dynamodb before assigning? How is the flow being called and how are the parameters being passed ? Is it possible based on either
return [source_config]
or
return []
that the
None
returned is correct, even if not expected? Also, it looks like this is all being done in Prefect v1 - have you considered testing this in Prefect 2?
t

Tushar Gupta

11/22/2022, 4:39 PM
@Christopher Boyd As you can see from the logs my flow logs are not even getting display.When get_config was not a prefect task it was working fine logs were showing up like("Getting the require config from aws dynamo db"). But when I made get_config a prefect task it is not displaying the logs
c

Christopher Boyd

11/22/2022, 5:06 PM
from what I can see, you are doing print statements in the tasks, but no task loggers are defined? There is one I see in run_config_processor, but none in the rest
t

Tushar Gupta

11/22/2022, 6:21 PM
from prefect import Flow, task, Parameter

from flows.Crawler.utilities.constants import (
    TABLE_KEY,
    TABLE_NAME,
)
from flows.Crawler.utilities.utils import load_crawler, format_error
import json
import os
from typing import List, Dict, Optional, Any, Union, Tuple, Callable

import boto3
import prefect
from prefect import Parameter, task
from prefect.run_configs import KubernetesRun, LocalRun, RunConfig

from flows.utilities.environment_settings_loader import (
    EnvironmentSettingsLoader,
    EnvironmentSettingEnum,
)


@task(name="run_config_processor")
def run_config_processor(config: Dict[str, Any]) -> None:
    """
    Task that processes files for a given config
    :param config: (Dict) Dict of some configurations from AWS dynamoDB ingestion-sources
    """
    logger = prefect.context.get("logger")
    try:
        <http://logger.info|logger.info>(f"Running Crawler for configuration -> {config}")
    except Exception as err:
        error_message = format_error(err, __file__)
        message = f'Error crawling "{config["name"]}" with error {error_message}'
        logger.error(error_message)

def get_run_config(additional_labels: Optional[List[str]] = None) -> RunConfig:
    if os.environ.get("ENV", "local") != "local":
        if "-ue1" in os.environ.get(BWELL_ENV_VAR, ""):
            labels = ["k8s"]
        else:
            labels = ["kubernetes"]
        if additional_labels:
            labels.extend(additional_labels)
        return KubernetesRun(
            labels=labels,
            env={
                "PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN": os.environ.get(
                    "PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN"
                ),
                "S3_STORAGE_BUCKET": os.environ.get("S3_STORAGE_BUCKET"),
                "ENV": os.environ.get("ENV"),
                "PYTHONPATH": "/helix_orchestration",
            },
            image=f"{EnvironmentSettingsLoader().get(EnvironmentSettingEnum.repository_url)}/prefect.agent:latest",
        )
    else:
        labels = ["local_flow"]
        if additional_labels:
            labels.extend(additional_labels)
        return LocalRun(labels=labels, working_dir="/helix_orchestration")
    
@task(name="get_config")
def get_config(
    name: Optional[str] = None, table_name: str = TABLE_NAME, table_key: str = TABLE_KEY
) -> List[Dict[str, Any]]:
    """
    Get the required config from AWS DynamoDB using the primary key value for the table
    :param name: Name field in the table
    :param table_name: (String) Table name that needs to be accessed
    :param table_key: (String) Table's primary key value
    """
    configs = get_config_from_dynamodb(table_name, table_key)
    source_configs = configs.get("sources", [])
    if name:
        for source_config in source_configs:
            if name != "" and name == source_config.get("name"):
                return [source_config]
        return []
    else:
        return source_configs if isinstance(source_configs, list) else [source_configs]

def get_config_from_dynamodb(table_name: str, table_key: str) -> Dict[str, Any]:
    env = os.environ.get("ENV", "local")
    if env == "local":
        return {}
    else:
        dynamodb = boto3.resource("dynamodb", region_name=AWS_REGION)
        # noinspection PyUnresolvedReferences
        try:
            table = dynamodb.Table(table_name)
            config = table.get_item(Key={"role": table_key})
            return dict(config["Item"])
        except botocore.exceptions.NoCredentialsError as e:
            print(
                f"Error reading dynamodb (NoCredentialsError): table: {table_name}, table_key={table_key}  {type(e)} {e} {json.dumps(e, default=str)}"
            )
            return {}
        except Exception as e:
            print(
                f"Error reading dynamodb: table: {table_name}, table_key={table_key}  {type(e)} {e} {json.dumps(e, default=str)}"
            )
            return {}

with Flow(
    "Ingestion Crawler",
    run_config=get_run_config(),
) as flow:
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Getting the required config from AWS DynamoDB.")
    config_name = Parameter("config_name", default="")
    table_name = Parameter("table_name", default="")
    table_key = Parameter("table_key", default="")
    configs = get_config(name=config_name, table_name=table_name, table_key=table_key)
    <http://logger.info|logger.info>(f"Fetched configs from AWS DynamoDB -> {configs}")
    result = run_config_processor.map(config=configs, upstream_tasks=[configs])
@Christopher Boyd @Bianca Hoch @Mason Menges I hope the code will help.Thanks all 3 of you for your support.
r

Ryan Peden

11/22/2022, 6:54 PM
When you run the flow, are you supplying the
table_name
and
table_key
parameters or are you leaving them empty and wanting to use
TABLE_NAME
and
TABLE_KEY
constants? If you want to use the constants, you may want to set those as the defaults for each parameter, i.e.
table_name = Parameter("table_name", default=TABLE_NAME)
table_key = Parameter("table_key", default=TABLE_KEY)
Also, in your exception handlers in
get_config_from_dynamodb
, consider removing the
return {}
lines. If either of those exceptions happen, it seems like you'd want to fail at that point since you weren't able to get the config?