Hello, I'm thinking about changing my company's or...
# marvin-in-the-wild
m
Hello, I'm thinking about changing my company's orchestrator to Prefect and I'm having difficulties. This is the scenario: • Self hosted on-premises kubernetes cluster. • I need flows with a frequency of at least 3 seconds: 1. Read a kafka topic 2. Call a api 3. Send back to kafka • Self hosted gitea repository When I try to run the code:
from prefect import flow
#
Source for the code to deploy (here, a GitHub repo)
SOURCE_REPO="<http://localhost:57095/aignosi/demos.git>"
if __name__ == "__main__":
flow.from_source(
source=SOURCE_REPO,
entrypoint="my_gh_workflow.py:repo_info", # _Specific flow to run_
).deploy(
name="my-first-deployment",
work_pool_name="worker-test", # _Work pool target_
cron="0 1 * * *", # _Cron schedule (1am every day)_
)
the worker broke with this error:
prefect.deployments.steps.core.StepExecutionError: Encountered error while running prefect.deployments.steps.git_clone
When I try to run the code with SOURCE_REPO="http://gitea-http.gitea.svc.cluster.local:3000/aignosi/demos.git" i got this in my local:
RuntimeError: Failed to clone repository '<http://gitea-http.gitea.svc.cluster.local:3000/aignosi/demos.git>' with exit code 128.
Question 1: Is prefect the right tool for me? Question2: How do i fiz this? (edited)
n
Question 1: Is prefect the right tool for me?
I think you can definitely accomplish this pattern with prefect
1. Read a kafka topic
2. Call a api
3. Send back to kafka
but whether or not a certain orchestrator is right for you I'd say depends on a lot of things! as far as your error with cloning, do you have more logs above that error you shared? I would expect a slightly more informative error from running the git command to exist somewhere in that stack trace
m
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/runner/storage.py", line 257, in _clone_repo
await run_process(cmd)
File "/usr/local/lib/python3.11/site-packages/anyio/_core/_subprocesses.py", line 89, in run_process
raise CalledProcessError(cast(int, process.returncode), command, output, errors)
subprocess.CalledProcessError: Command '['git', 'clone', '<http://localhost:57095/aignosi/demos.git>', '--depth', '1', '/opt/prefect/demos']' returned non-zero exit status 128.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 154, in run_steps
step_output = await run_step(step, upstream_outputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 125, in run_step
result = await from_async.call_soon_in_new_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 309, in coroutine_wrapper
return call()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
return self.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/pull.py", line 123, in git_clone
await storage.pull_code()
File "/usr/local/lib/python3.11/site-packages/prefect/runner/storage.py", line 233, in pull_code
await self._clone_repo()
File "/usr/local/lib/python3.11/site-packages/prefect/runner/storage.py", line 261, in _clone_repo
raise RuntimeError(
RuntimeError: Failed to clone repository '<http://localhost:57095/aignosi/demos.git>' with exit code 128.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 426, in retrieve_flow_then_begin_flow_run
else await load_flow_from_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/deployments.py", line 307, in load_flow_from_flow_run
output = await run_steps(deployment.pull_steps)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 182, in run_steps
raise StepExecutionError(f"Encountered error while running {fqn}") from exc
prefect.deployments.steps.core.StepExecutionError: Encountered error while running prefect.deployments.steps.git_clone
This error is from worker-pool whe i run SOURCE_REPO="http://gitea:57095/aignosi/demos.git" making a por-forward with my local machine
Traceback (most recent call last):
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\runner\storage.py", line 257, in _clone_repo
await run_process(cmd)
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\anyio\_core\_subprocesses.py", line 89, in run_process
raise CalledProcessError(cast(int, process.returncode), command, output, errors)
subprocess.CalledProcessError: Command '['git', 'clone', '<http://gitea-http.gitea.svc.cluster.local:3000/aignosi/demos.git>', '--depth', '1', 'C:\\Users\\mathe\\AppData\\Local\\Temp\\tmpedmdij9a\\demos']' returned non-zero exit status 128.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "c:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\create_deployment.py", line 7, in <module>
flow.from_source(
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 309, in coroutine_wrapper
return call()
^^^^^^
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 432, in __call__
return self.result()
^^^^^^^^^^^^^
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\mathe\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\flows.py", line 931, in from_source
await storage.pull_code()
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\runner\storage.py", line 233, in pull_code
await self._clone_repo()
File "C:\Users\mathe\OneDrive\Documentos\Aignosi\airflow\venv\Lib\site-packages\prefect\runner\storage.py", line 261, in _clone_repo
raise RuntimeError(
RuntimeError: Failed to clone repository '<http://gitea-http.gitea.svc.cluster.local:3000/aignosi/demos.git>' with exit code 128.
This is on my local machine when i run with SOURCE_REPO="http://gitea-http.gitea.svc.cluster.local:3000/aignosi/demos.git"
n
as a sanity check, can you clone the same repo with your git CLI directly?
m
n
are you sure that wherever the worker is running has access?
m
yess, i gave all permissions to access another pod/service
n
im not so familiar with gitea - does it "just work" with the git cli? because that's what that step is using under the hood
m
It is the same as a github repository, the difference is i'm hosting it.
it works when i "prefect deploy"
but how do i do via API?
n
sorry, how do you do what via API?
m
e mean, how to perform these same steps via the python API and not the terminal?
n
> how to perform these same steps do you mean defining a deployment? you can do it via • yaml to define all those things and then
prefect deploy -n foo-deployment
etc • the pythonic wrapper on that
from_source(...).deploy(...)
m
i mean defining pull steps with pythonic wrapper
thankss
i'll read