Thread
#prefect-community
    Farid

    Farid

    7 months ago
    Hi, I’m using
    GITLAB
    as storage and
    Kubernetes
    as agent to run prefect flows. I noticed I get dependency errors when a custom pip package is used inside the flow, in this case soda:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sodasql'")
    Is there a way to address these dependency issues without using Docker images to store the flows?
    Kevin Kho

    Kevin Kho

    7 months ago
    Kinda of. It’s not best practice but if you use the base Prefect image, you can use
    EXTRA_PIP_PACKAGES
    here
    Best practice of course it to make your own container because you download this each time
    s

    Sam Werbalowsky

    7 months ago
    Hey Farid - we use
    GIT
    storage (with Gitlab), and
    Kubernetes
    - our solution was to push an image as part of CI and flow registration with the packages, then pull that image for the jobs
    Farid

    Farid

    7 months ago
    Thanks guys, you helped me understand the difference between the docker image for storage and the image for runtime. But, I still get a weird error even though I specify the extra pip dependency:
    with Flow(
        name="Soda SQL Scan POC 1",
        storage=GitLab(
            repo="predicthq/data-engineering/prefect-flows",
            path="flows/soda-poc-1/flow_soda_scan.py",
            access_token_secret="GITLAB_TOKEN",
        ),
        run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
    ) as flow:
        flow.add_task(dq_task)
    The error I get:
    Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
    Any ideas?
    Kevin Kho

    Kevin Kho

    7 months ago
    This is weird. This normally happens when you use something like local storage and the file is not in the image. Can you try this with a LocalRun config to test?
    Can you show more of the Flow block also?
    Farid

    Farid

    7 months ago
    This is the whole file, take from
    from typing import Dict, Union
    
    from prefect import Flow, Task
    from prefect.run_configs import KubernetesRun
    from prefect.storage import GitLab
    from prefect.utilities.tasks import defaults_from_attrs
    
    from sodasql.scan.scan_builder import ScanBuilder
    
    
    class SodaSQLScan(Task):
        """
        SodaSQLScan
        """
    
        def __init__(self, scan_def: Union[Dict, str] = None, warehouse_def: Union[Dict, str] = None, **kwargs):
            """
            Args:
                scan_def: Soda SQL scan file path or dictionary
                warehouse_def: Soda SQL warehouse file path or dictionary
                **kwargs:
            """
            self.scan_builder = ScanBuilder()
            self._set_scan_def(scan_def=scan_def)
            self._set_warehouse_def(warehouse_def=warehouse_def)
            super().__init__(**kwargs)
    
        def _set_scan_def(self, scan_def: Union[Dict, str]) -> None:
            """
            Args:
                scan_def: Soda SQL scan file path or dictionary
            Returns:
            """
            self.scan_def = scan_def
            if isinstance(scan_def, str):
                self.scan_builder.scan_yml_file = scan_def
            elif isinstance(scan_def, dict):
                self.scan_builder.scan_yml_dict = scan_def
    
        def _set_warehouse_def(self, warehouse_def: Union[Dict, str]) -> None:
            """
            Args:
                warehouse_def: Soda SQL warehouse file path or dictionary
            Returns:
            """
    
            if isinstance(warehouse_def, str):
                from yaml import safe_load
    
                self.warehouse_def = safe_load(open(warehouse_def))
                if "password" not in self.warehouse_def.get("connection", {}):
                    from prefect.tasks.secrets import PrefectSecret
    
                    self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))
    
                self.scan_builder.warehouse_yml_dict = warehouse_def
    
        @defaults_from_attrs("scan_def", "warehouse_def")
        def run(self, scan_def: Union[Dict, str], warehouse_def: Union[Dict, str]) -> Dict:
            """
            Args:
                scan_def: Soda SQL scan file path or dictionary
                warehouse_def: Soda SQL warehouse file path or dictionary
            Returns: Soda SQL scan results as JSON object
            """
            if scan_def is None:
                raise ValueError(
                    "Scan definition cannot be None. \
                    Please provide either a path to a scan definition file or a scan definition dictionary"
                )
            if warehouse_def is None:
                raise ValueError(
                    "Warehouse definition cannot be None. \
                    Please provide either a path to a warehouse definition file or a warehouse definition dictionary"
                )
            scan = self.scan_builder.build()
            return scan.execute().to_json()
    
    
    dq_task = SodaSQLScan(scan_def="flows/soda-poc-1/event.yml", warehouse_def="flows/soda-poc-1/warehouse.yml")
    
    with Flow(
        name="Soda SQL Scan POC 1",
        storage=GitLab(
            repo="predicthq/data-engineering/prefect-flows",
            path="flows/soda-poc-1/flow_soda_scan.py",
            access_token_secret="GITLAB_TOKEN",
        ),
        run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
    ) as flow:
        flow.add_task(dq_task)
    Kevin Kho

    Kevin Kho

    7 months ago
    Can you also just try calling the task?
    with Flow(
        name="Soda SQL Scan POC 1",
        storage=GitLab(
            repo="predicthq/data-engineering/prefect-flows",
            path="flows/soda-poc-1/flow_soda_scan.py",
            access_token_secret="GITLAB_TOKEN",
        ),
        run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
    ) as flow:
        dq_task()
    Farid

    Farid

    7 months ago
    That fails when it tries to register the flow. I register it using a seperate file that runs in CI/CD:
    import os
    
    from prefect import Client
    from prefect.utilities.storage import extract_flow_from_file
    
    client = Client()
    
    
    def iterate_flows_in_dir(flows_dir="flows"):
        for path, _, files in os.walk(flows_dir):
            for name in files:
    
                if not name.endswith(".py") or not name.startswith("flow_"):
                    continue
    
                project_name = os.path.basename(path)
                flow_file_path = os.path.join(path, name)
    
                yield project_name, flow_file_path
    
    
    for project_name, flow_file_path in iterate_flows_in_dir():
        flow = extract_flow_from_file(flow_file_path)
    
        client.create_project(project_name=project_name)
        flow.register(
            project_name=project_name,
            idempotency_key=flow.serialized_hash(),  # Generates a hash to prevent unnecessary flow version bump
        )
    which fail when I just call the task as you mentioned:
    $ python register_flows.py
    Traceback (most recent call last):
      File "register_flows.py", line 23, in <module>
        flow = extract_flow_from_file(flow_file_path)
      File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/utilities/storage.py", line 88, in extract_flow_from_file
        exec(contents, exec_vals)
      File "<string>", line 91, in <module>
      File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 634, in __call__
        *args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
      File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 674, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "/usr/local/lib/python3.7/inspect.py", line 3015, in bind
        return args[0]._bind(args[1:], kwargs)
      File "/usr/local/lib/python3.7/inspect.py", line 2930, in _bind
        raise TypeError(msg) from None
    TypeError: missing a required argument: 'scan_def'
    Kevin Kho

    Kevin Kho

    7 months ago
    You need to pass
    scan_def
    to your
    dq_task
    I think
    I see, the run is not picking it up from the init. I dont know why yet
    Farid

    Farid

    7 months ago
    I tried passing the args to the
    dq_task
    and received the same
    Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
    error
    Kevin Kho

    Kevin Kho

    7 months ago
    Ah ok I have an idea what is going on. Prefect is loading
    predicthq/data-engineering/prefect-flows"
    on Gitlab and not seeing any Flow inside or not any flow with the same name. What is the content of this Flow and does a
    Flow
    object exist?
    Farid

    Farid

    7 months ago
    @Kevin Kho I overcome that issue by removing the part related to retrieving the password form prefect secrets and committing the password in the source control
    from prefect.tasks.secrets import PrefectSecret
    
    self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))
    However, now I receive another error (related to soda) when I run the flow on Kubernetes agent. However trying the same flow locally with
    task.run()
    runs successfully.
    Task 'SodaSQLScan': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "<string>", line 72, in run
      File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 161, in build
        self._build_warehouse_yml()
      File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 190, in _build_warehouse_yml
        self.parse_warehouse_yml(warehouse_yml_parser)
      File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 195, in parse_warehouse_yml
        self.warehouse_yml = warehouse_parser.warehouse_yml
    AttributeError: 'WarehouseYmlParser' object has no attribute 'warehouse_yml'
    Kevin Kho

    Kevin Kho

    7 months ago
    This would make me believe is a version mismatch of execution environment and registration
    Farid

    Farid

    7 months ago
    On soda or prefect?
    Kevin Kho

    Kevin Kho

    7 months ago
    Soda version because the WarehouseYmlParser object is lacking a
    warehouse_yml
    . That is a Soda class right?
    Not super sure as I haven;t used Soda myself and the example is a bit too involved for me to see clearly at the moment
    Farid

    Farid

    7 months ago
    I think this issue is related to soda not being able to read the warehouse.ymlFile. Would the supplementary files that are needed for flow's execution be accessible if they are committed in the same git repo with their relative path specified in the flow? I feel like I should resort to using docker images to store the flows rather than Git
    Kevin Kho

    Kevin Kho

    7 months ago
    See this for that
    Farid

    Farid

    7 months ago
    This was pretty helpful @Kevin Kho, thank you. However I had to use
    GIT
    instead of
    GITLAB
    for storage. I think there is a lot of difference in the underlying code. I was getting
    Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
    when using
    GITLAB
    and retrieving the file path. The other thing to mention was I could not retrieve the additional files inside the task, they had to be retrieved out of them and then passed to the tasks.
    Kevin Kho

    Kevin Kho

    7 months ago
    It is loaded at the start of the Flow, and then deleted before the tasks are executed. So yes it needs to be loaded in at the start.