Farid
02/09/2022, 8:58 PMGITLAB
as storage and Kubernetes
as agent to run prefect flows. I noticed I get dependency errors when a custom pip package is used inside the flow, in this case soda:
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sodasql'")
Is there a way to address these dependency issues without using Docker images to store the flows?Kevin Kho
EXTRA_PIP_PACKAGES
hereKevin Kho
Sam Werbalowsky
02/09/2022, 9:07 PMGIT
storage (with Gitlab), and Kubernetes
- our solution was to push an image as part of CI and flow registration with the packages, then pull that image for the jobsFarid
02/09/2022, 9:49 PMwith Flow(
name="Soda SQL Scan POC 1",
storage=GitLab(
repo="predicthq/data-engineering/prefect-flows",
path="flows/soda-poc-1/flow_soda_scan.py",
access_token_secret="GITLAB_TOKEN",
),
run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
flow.add_task(dq_task)
The error I get:
Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
Any ideas?Kevin Kho
Kevin Kho
Farid
02/09/2022, 10:14 PMfrom typing import Dict, Union
from prefect import Flow, Task
from prefect.run_configs import KubernetesRun
from prefect.storage import GitLab
from prefect.utilities.tasks import defaults_from_attrs
from sodasql.scan.scan_builder import ScanBuilder
class SodaSQLScan(Task):
"""
SodaSQLScan
"""
def __init__(self, scan_def: Union[Dict, str] = None, warehouse_def: Union[Dict, str] = None, **kwargs):
"""
Args:
scan_def: Soda SQL scan file path or dictionary
warehouse_def: Soda SQL warehouse file path or dictionary
**kwargs:
"""
self.scan_builder = ScanBuilder()
self._set_scan_def(scan_def=scan_def)
self._set_warehouse_def(warehouse_def=warehouse_def)
super().__init__(**kwargs)
def _set_scan_def(self, scan_def: Union[Dict, str]) -> None:
"""
Args:
scan_def: Soda SQL scan file path or dictionary
Returns:
"""
self.scan_def = scan_def
if isinstance(scan_def, str):
self.scan_builder.scan_yml_file = scan_def
elif isinstance(scan_def, dict):
self.scan_builder.scan_yml_dict = scan_def
def _set_warehouse_def(self, warehouse_def: Union[Dict, str]) -> None:
"""
Args:
warehouse_def: Soda SQL warehouse file path or dictionary
Returns:
"""
if isinstance(warehouse_def, str):
from yaml import safe_load
self.warehouse_def = safe_load(open(warehouse_def))
if "password" not in self.warehouse_def.get("connection", {}):
from prefect.tasks.secrets import PrefectSecret
self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))
self.scan_builder.warehouse_yml_dict = warehouse_def
@defaults_from_attrs("scan_def", "warehouse_def")
def run(self, scan_def: Union[Dict, str], warehouse_def: Union[Dict, str]) -> Dict:
"""
Args:
scan_def: Soda SQL scan file path or dictionary
warehouse_def: Soda SQL warehouse file path or dictionary
Returns: Soda SQL scan results as JSON object
"""
if scan_def is None:
raise ValueError(
"Scan definition cannot be None. \
Please provide either a path to a scan definition file or a scan definition dictionary"
)
if warehouse_def is None:
raise ValueError(
"Warehouse definition cannot be None. \
Please provide either a path to a warehouse definition file or a warehouse definition dictionary"
)
scan = self.scan_builder.build()
return scan.execute().to_json()
dq_task = SodaSQLScan(scan_def="flows/soda-poc-1/event.yml", warehouse_def="flows/soda-poc-1/warehouse.yml")
with Flow(
name="Soda SQL Scan POC 1",
storage=GitLab(
repo="predicthq/data-engineering/prefect-flows",
path="flows/soda-poc-1/flow_soda_scan.py",
access_token_secret="GITLAB_TOKEN",
),
run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
flow.add_task(dq_task)
Kevin Kho
with Flow(
name="Soda SQL Scan POC 1",
storage=GitLab(
repo="predicthq/data-engineering/prefect-flows",
path="flows/soda-poc-1/flow_soda_scan.py",
access_token_secret="GITLAB_TOKEN",
),
run_config=KubernetesRun(env={"EXTRA_PIP_PACKAGES": "soda-sql-snowflake"}, image="prefecthq/prefect:latest"),
) as flow:
dq_task()
Farid
02/09/2022, 10:46 PMimport os
from prefect import Client
from prefect.utilities.storage import extract_flow_from_file
client = Client()
def iterate_flows_in_dir(flows_dir="flows"):
for path, _, files in os.walk(flows_dir):
for name in files:
if not name.endswith(".py") or not name.startswith("flow_"):
continue
project_name = os.path.basename(path)
flow_file_path = os.path.join(path, name)
yield project_name, flow_file_path
for project_name, flow_file_path in iterate_flows_in_dir():
flow = extract_flow_from_file(flow_file_path)
client.create_project(project_name=project_name)
flow.register(
project_name=project_name,
idempotency_key=flow.serialized_hash(), # Generates a hash to prevent unnecessary flow version bump
)
which fail when I just call the task as you mentioned:
$ python register_flows.py
Traceback (most recent call last):
File "register_flows.py", line 23, in <module>
flow = extract_flow_from_file(flow_file_path)
File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/utilities/storage.py", line 88, in extract_flow_from_file
exec(contents, exec_vals)
File "<string>", line 91, in <module>
File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 634, in __call__
*args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
File "/builds/predicthq/data-engineering/prefect-flows/venv/lib/python3.7/site-packages/prefect/core/task.py", line 674, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/usr/local/lib/python3.7/inspect.py", line 3015, in bind
return args[0]._bind(args[1:], kwargs)
File "/usr/local/lib/python3.7/inspect.py", line 2930, in _bind
raise TypeError(msg) from None
TypeError: missing a required argument: 'scan_def'
Kevin Kho
scan_def
to your dq_task
I thinkKevin Kho
Farid
02/10/2022, 12:55 AMdq_task
and received the same Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
errorKevin Kho
predicthq/data-engineering/prefect-flows"
on Gitlab and not seeing any Flow inside or not any flow with the same name. What is the content of this Flow and does a Flow
object exist?Farid
02/10/2022, 3:37 AMfrom prefect.tasks.secrets import PrefectSecret
self.warehouse_def["connection"].update(password=PrefectSecret("MYSECRET"))
However, now I receive another error (related to soda) when I run the flow on Kubernetes agent. However trying the same flow locally with task.run()
runs successfully.
Task 'SodaSQLScan': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "<string>", line 72, in run
File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 161, in build
self._build_warehouse_yml()
File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 190, in _build_warehouse_yml
self.parse_warehouse_yml(warehouse_yml_parser)
File "/usr/local/lib/python3.9/site-packages/sodasql/scan/scan_builder.py", line 195, in parse_warehouse_yml
self.warehouse_yml = warehouse_parser.warehouse_yml
AttributeError: 'WarehouseYmlParser' object has no attribute 'warehouse_yml'
Kevin Kho
Farid
02/10/2022, 3:43 AMKevin Kho
warehouse_yml
. That is a Soda class right?Kevin Kho
Farid
02/10/2022, 4:04 AMKevin Kho
Farid
02/10/2022, 6:02 AMGIT
instead of GITLAB
for storage. I think there is a lot of difference in the underlying code.
I was getting
Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
when using GITLAB
and retrieving the file path.
The other thing to mention was I could not retrieve the additional files inside the task, they had to be retrieved out of them and then passed to the tasks.Kevin Kho
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by