Farid
02/09/2022, 8:58 PMGITLAB
as storage and Kubernetes
as agent to run prefect flows. I noticed I get dependency errors when a custom pip package is used inside the flow, in this case soda:
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sodasql'")
Is there a way to address these dependency issues without using Docker images to store the flows?Kevin Kho
EXTRA_PIP_PACKAGES
hereSam Werbalowsky
02/09/2022, 9:07 PMGIT
storage (with Gitlab), and Kubernetes
- our solution was to push an image as part of CI and flow registration with the packages, then pull that image for the jobsFarid
02/09/2022, 9:49 PMwith Flow(
name="Soda SQL Scan POC 1",
storage=GitLab(
repo="predicthq/data-engineering/prefect-flows",
path="flows/soda-poc-1/flow_soda_scan.py",
access_token_secret="GITLAB_TOKEN",
),
run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
flow.add_task(dq_task)
The error I get:
Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
Any ideas?Kevin Kho
Farid
02/09/2022, 10:14 PMfrom typing import Dict, Union
from prefect import Flow, Task
from prefect.run_configs import KubernetesRun
from prefect.storage import GitLab
from prefect.utilities.tasks import defaults_from_attrs
from sodasql.scan.scan_builder import ScanBuilder
class SodaSQLScan(Task):
"""
SodaSQLScan
"""
def __init__(self, scan_def: Union[Dict, str] = None, warehouse_def: Union[Dict, str] = None, **kwargs):
"""
Args:
scan_def: Soda SQL scan file path or dictionary
warehouse_def: Soda SQL warehouse file path or dictionary
**kwargs:
"""
self.scan_builder = ScanBuilder()
self._set_scan_def(scan_def=scan_def)
self._set_warehouse_def(warehouse_def=warehouse_def)
super().__init__(**kwargs)
def _set_scan_def(self, scan_def: Union[Dict, str]) -> None:
"""
Args:
scan_def: Soda SQL scan file path or dictionary
Returns:
"""
self.scan_def = scan_def
if isinstance(scan_def, str):
self.scan_builder.scan_yml_file = scan_def
elif isinstance(scan_def, dict):
self.scan_builder.scan_yml_dict = scan_def
def _set_warehouse_def(self, warehouse_def: Union[Dict, str]) -> None:
"""
Args:
warehouse_def: Soda SQL warehouse file path or dictionary
Returns:
"""
if isinstance(warehouse_def, str):
from yaml import safe_load
self.warehouse_def = safe_load(open(warehouse_def))
if "password" not in self.warehouse_def.get("connection", {}):
from prefect.tasks.secrets import PrefectSecret
self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))
self.scan_builder.warehouse_yml_dict = warehouse_def
@defaults_from_attrs("scan_def", "warehouse_def")
def run(self, scan_def: Union[Dict, str], warehouse_def: Union[Dict, str]) -> Dict:
"""
Args:
scan_def: Soda SQL scan file path or dictionary
warehouse_def: Soda SQL warehouse file path or dictionary
Returns: Soda SQL scan results as JSON object
"""
if scan_def is None:
raise ValueError(
"Scan definition cannot be None. \
Please provide either a path to a scan definition file or a scan definition dictionary"
)
if warehouse_def is None:
raise ValueError(
"Warehouse definition cannot be None. \
Please provide either a path to a warehouse definition file or a warehouse definition dictionary"
)
scan = self.scan_builder.build()
return scan.execute().to_json()
dq_task = SodaSQLScan(scan_def="flows/soda-poc-1/event.yml", warehouse_def="flows/soda-poc-1/warehouse.yml")
with Flow(
name="Soda SQL Scan POC 1",
storage=GitLab(
repo="predicthq/data-engineering/prefect-flows",
path="flows/soda-poc-1/flow_soda_scan.py",
access_token_secret="GITLAB_TOKEN",
),
run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
flow.add_task(dq_task)
Kevin Kho
with Flow(
name="Soda SQL Scan POC 1",
storage=GitLab(
repo="predicthq/data-engineering/prefect-flows",
path="flows/soda-poc-1/flow_soda_scan.py",
access_token_secret="GITLAB_TOKEN",
),
run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
dq_task()
Farid
02/09/2022, 10:46 PMimport os
from prefect import Client
from prefect.utilities.storage import extract_flow_from_file
client = Client()
def iterate_flows_in_dir(flows_dir="flows"):
for path, _, files in os.walk(flows_dir):
for name in files:
if not name.endswith(".py") or not name.startswith("flow_"):
continue
project_name = os.path.basename(path)
flow_file_path = os.path.join(path, name)
yield project_name, flow_file_path
for project_name, flow_file_path in iterate_flows_in_dir():
flow = extract_flow_from_file(flow_file_path)
client.create_project(project_name=project_name)
flow.register(
project_name=project_name,
idempotency_key=flow.serialized_hash(), # Generates a hash to prevent unnecessary flow version bump
)
which fail when I just call the task as you mentioned:
$ python register_flows.py
Traceback (most recent call last):
File "register_flows.py", line 23, in <module>
flow = extract_flow_from_file(flow_file_path)
File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/utilities/storage.py", line 88, in extract_flow_from_file
exec(contents, exec_vals)
File "<string>", line 91, in <module>
File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 634, in __call__
*args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 674, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/usr/local/lib/python3.7/inspect.py", line 3015, in bind
return args[0]._bind(args[1:], kwargs)
File "/usr/local/lib/python3.7/inspect.py", line 2930, in _bind
raise TypeError(msg) from None
TypeError: missing a required argument: 'scan_def'
Kevin Kho
scan_def
to your dq_task
I thinkFarid
02/10/2022, 12:55 AMdq_task
and received the same Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
errorKevin Kho
predicthq/data-engineering/prefect-flows"
on Gitlab and not seeing any Flow inside or not any flow with the same name. What is the content of this Flow and does a Flow
object exist?Farid
02/10/2022, 3:37 AMfrom prefect.tasks.secrets import PrefectSecret
self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))
However, now I receive another error (related to soda) when I run the flow on Kubernetes agent. However trying the same flow locally with task.run()
runs successfully.
Task 'SodaSQLScan': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "<string>", line 72, in run
File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 161, in build
self._build_warehouse_yml()
File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 190, in _build_warehouse_yml
self.parse_warehouse_yml(warehouse_yml_parser)
File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 195, in parse_warehouse_yml
self.warehouse_yml = warehouse_parser.warehouse_yml
AttributeError: 'WarehouseYmlParser' object has no attribute 'warehouse_yml'
Kevin Kho
Farid
02/10/2022, 3:43 AMKevin Kho
warehouse_yml
. That is a Soda class right?Farid
02/10/2022, 4:04 AMKevin Kho
Farid
02/10/2022, 6:02 AMGIT
instead of GITLAB
for storage. I think there is a lot of difference in the underlying code.
I was getting
Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
when using GITLAB
and retrieving the file path.
The other thing to mention was I could not retrieve the additional files inside the task, they had to be retrieved out of them and then passed to the tasks.Kevin Kho