https://prefect.io logo
f

Farid

02/09/2022, 8:58 PM
Hi, I’m using
GITLAB
as storage and
Kubernetes
as agent to run prefect flows. I noticed I get dependency errors when a custom pip package is used inside the flow, in this case soda:
Copy code
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sodasql'")
Is there a way to address these dependency issues without using Docker images to store the flows?
k

Kevin Kho

02/09/2022, 8:59 PM
Kinda of. It’s not best practice but if you use the base Prefect image, you can use
EXTRA_PIP_PACKAGES
here
Best practice of course it to make your own container because you download this each time
s

Sam Werbalowsky

02/09/2022, 9:07 PM
Hey Farid - we use
GIT
storage (with Gitlab), and
Kubernetes
- our solution was to push an image as part of CI and flow registration with the packages, then pull that image for the jobs
upvote 1
f

Farid

02/09/2022, 9:49 PM
Thanks guys, you helped me understand the difference between the docker image for storage and the image for runtime. But, I still get a weird error even though I specify the extra pip dependency:
Copy code
with Flow(
    name="Soda SQL Scan POC 1",
    storage=GitLab(
        repo="predicthq/data-engineering/prefect-flows",
        path="flows/soda-poc-1/flow_soda_scan.py",
        access_token_secret="GITLAB_TOKEN",
    ),
    run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
    flow.add_task(dq_task)
The error I get:
Copy code
Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
Any ideas?
k

Kevin Kho

02/09/2022, 10:06 PM
This is weird. This normally happens when you use something like local storage and the file is not in the image. Can you try this with a LocalRun config to test?
Can you show more of the Flow block also?
f

Farid

02/09/2022, 10:14 PM
This is the whole file, take from
Copy code
from typing import Dict, Union

from prefect import Flow, Task
from prefect.run_configs import KubernetesRun
from prefect.storage import GitLab
from prefect.utilities.tasks import defaults_from_attrs

from sodasql.scan.scan_builder import ScanBuilder


class SodaSQLScan(Task):
    """
    SodaSQLScan
    """

    def __init__(self, scan_def: Union[Dict, str] = None, warehouse_def: Union[Dict, str] = None, **kwargs):
        """
        Args:
            scan_def: Soda SQL scan file path or dictionary
            warehouse_def: Soda SQL warehouse file path or dictionary
            **kwargs:
        """
        self.scan_builder = ScanBuilder()
        self._set_scan_def(scan_def=scan_def)
        self._set_warehouse_def(warehouse_def=warehouse_def)
        super().__init__(**kwargs)

    def _set_scan_def(self, scan_def: Union[Dict, str]) -> None:
        """
        Args:
            scan_def: Soda SQL scan file path or dictionary
        Returns:
        """
        self.scan_def = scan_def
        if isinstance(scan_def, str):
            self.scan_builder.scan_yml_file = scan_def
        elif isinstance(scan_def, dict):
            self.scan_builder.scan_yml_dict = scan_def

    def _set_warehouse_def(self, warehouse_def: Union[Dict, str]) -> None:
        """
        Args:
            warehouse_def: Soda SQL warehouse file path or dictionary
        Returns:
        """

        if isinstance(warehouse_def, str):
            from yaml import safe_load

            self.warehouse_def = safe_load(open(warehouse_def))
            if "password" not in self.warehouse_def.get("connection", {}):
                from prefect.tasks.secrets import PrefectSecret

                self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))

            self.scan_builder.warehouse_yml_dict = warehouse_def

    @defaults_from_attrs("scan_def", "warehouse_def")
    def run(self, scan_def: Union[Dict, str], warehouse_def: Union[Dict, str]) -> Dict:
        """
        Args:
            scan_def: Soda SQL scan file path or dictionary
            warehouse_def: Soda SQL warehouse file path or dictionary
        Returns: Soda SQL scan results as JSON object
        """
        if scan_def is None:
            raise ValueError(
                "Scan definition cannot be None. \
                Please provide either a path to a scan definition file or a scan definition dictionary"
            )
        if warehouse_def is None:
            raise ValueError(
                "Warehouse definition cannot be None. \
                Please provide either a path to a warehouse definition file or a warehouse definition dictionary"
            )
        scan = self.scan_builder.build()
        return scan.execute().to_json()


dq_task = SodaSQLScan(scan_def="flows/soda-poc-1/event.yml", warehouse_def="flows/soda-poc-1/warehouse.yml")

with Flow(
    name="Soda SQL Scan POC 1",
    storage=GitLab(
        repo="predicthq/data-engineering/prefect-flows",
        path="flows/soda-poc-1/flow_soda_scan.py",
        access_token_secret="GITLAB_TOKEN",
    ),
    run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
    flow.add_task(dq_task)
k

Kevin Kho

02/09/2022, 10:18 PM
Can you also just try calling the task?
Copy code
with Flow(
    name="Soda SQL Scan POC 1",
    storage=GitLab(
        repo="predicthq/data-engineering/prefect-flows",
        path="flows/soda-poc-1/flow_soda_scan.py",
        access_token_secret="GITLAB_TOKEN",
    ),
    run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
    dq_task()
f

Farid

02/09/2022, 10:46 PM
That fails when it tries to register the flow. I register it using a seperate file that runs in CI/CD:
Copy code
import os

from prefect import Client
from prefect.utilities.storage import extract_flow_from_file

client = Client()


def iterate_flows_in_dir(flows_dir="flows"):
    for path, _, files in os.walk(flows_dir):
        for name in files:

            if not name.endswith(".py") or not name.startswith("flow_"):
                continue

            project_name = os.path.basename(path)
            flow_file_path = os.path.join(path, name)

            yield project_name, flow_file_path


for project_name, flow_file_path in iterate_flows_in_dir():
    flow = extract_flow_from_file(flow_file_path)

    client.create_project(project_name=project_name)
    flow.register(
        project_name=project_name,
        idempotency_key=flow.serialized_hash(),  # Generates a hash to prevent unnecessary flow version bump
    )
which fail when I just call the task as you mentioned:
Copy code
$ python register_flows.py
Traceback (most recent call last):
  File "register_flows.py", line 23, in <module>
    flow = extract_flow_from_file(flow_file_path)
  File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/utilities/storage.py", line 88, in extract_flow_from_file
    exec(contents, exec_vals)
  File "<string>", line 91, in <module>
  File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 634, in __call__
    *args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
  File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 674, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/usr/local/lib/python3.7/inspect.py", line 3015, in bind
    return args[0]._bind(args[1:], kwargs)
  File "/usr/local/lib/python3.7/inspect.py", line 2930, in _bind
    raise TypeError(msg) from None
TypeError: missing a required argument: 'scan_def'
k

Kevin Kho

02/09/2022, 11:20 PM
You need to pass
scan_def
to your
dq_task
I think
I see, the run is not picking it up from the init. I dont know why yet
f

Farid

02/10/2022, 12:55 AM
I tried passing the args to the
dq_task
and received the same
Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
error
k

Kevin Kho

02/10/2022, 2:49 AM
Ah ok I have an idea what is going on. Prefect is loading
predicthq/data-engineering/prefect-flows"
on Gitlab and not seeing any Flow inside or not any flow with the same name. What is the content of this Flow and does a
Flow
object exist?
f

Farid

02/10/2022, 3:37 AM
@Kevin Kho I overcome that issue by removing the part related to retrieving the password form prefect secrets and committing the password in the source control
Copy code
from prefect.tasks.secrets import PrefectSecret

self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))
However, now I receive another error (related to soda) when I run the flow on Kubernetes agent. However trying the same flow locally with
task.run()
runs successfully.
Copy code
Task 'SodaSQLScan': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "<string>", line 72, in run
  File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 161, in build
    self._build_warehouse_yml()
  File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 190, in _build_warehouse_yml
    self.parse_warehouse_yml(warehouse_yml_parser)
  File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 195, in parse_warehouse_yml
    self.warehouse_yml = warehouse_parser.warehouse_yml
AttributeError: 'WarehouseYmlParser' object has no attribute 'warehouse_yml'
k

Kevin Kho

02/10/2022, 3:40 AM
This would make me believe is a version mismatch of execution environment and registration
f

Farid

02/10/2022, 3:43 AM
On soda or prefect?
k

Kevin Kho

02/10/2022, 3:45 AM
Soda version because the WarehouseYmlParser object is lacking a
warehouse_yml
. That is a Soda class right?
Not super sure as I haven;t used Soda myself and the example is a bit too involved for me to see clearly at the moment
f

Farid

02/10/2022, 4:04 AM
I think this issue is related to soda not being able to read the `warehouse.yml`File. Would the supplementary files that are needed for flow's execution be accessible if they are committed in the same git repo with their relative path specified in the flow? I feel like I should resort to using docker images to store the flows rather than Git
k

Kevin Kho

02/10/2022, 4:11 AM
See this for that
🙌 1
f

Farid

02/10/2022, 6:02 AM
This was pretty helpful @Kevin Kho, thank you. However I had to use
GIT
instead of
GITLAB
for storage. I think there is a lot of difference in the underlying code. I was getting
Copy code
Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
when using
GITLAB
and retrieving the file path. The other thing to mention was I could not retrieve the additional files inside the task, they had to be retrieved out of them and then passed to the tasks.
k

Kevin Kho

02/10/2022, 1:59 PM
It is loaded at the start of the Flow, and then deleted before the tasks are executed. So yes it needs to be loaded in at the start.
👍 1
28 Views