Hey, I recently ran into this issue: ```Failed to...
# prefect-server
e
Hey, I recently ran into this issue:
Copy code
Failed to load and execute flow run: RuntimeError("can't start new thread")
Restarting the VM resolved the issue but no flows were able to run at all until it was restarted, all with the same or similar errors. Is there any way to fix this without restarting the VM?
a
It looks like you have already started more threads than your system (or Python process) can handle. Did you manually configure the number of threads to use in your flow configuration? Can you share your Flow executor definition?
e
I deployed Prefect with its default settings, not sure where I can see the flow configuration? I didn't run into this issue previously because I ran a vm restart once a day which I removed because I had overnight flows running. Isn't it sub optimal that the system doesn't get rid of the unused threads after a flow run? I dont actually run that many flows, its mostly one at a time and tops 10 a day.
a
I meant: can you share your flow code?
e
Ah yes, sorry! Although all of my flows regardless of which one was running failed, but this is an example.
Copy code
from prefect import Flow, task, context
from prefect.tasks.shell import ShellTask
from prefect.triggers import all_finished
from prefect.tasks.dbt.dbt import DbtShellTask

# ----------- Config --------------
target = "vm"
# ---------------------------------

dbt = DbtShellTask(
    return_all=True,
    profile_name="x",
    environment="y",
    profiles_dir=".",
    log_stdout=True,
    helper_script='****************',
    log_stderr=True,
    stream_output=True)

with Flow("dbt Data Tests") as flow:

    pull_repo = ShellTask(
            helper_script= '******',
            command= '*********',
            name= "Clone DBT",
            log_stdout=True,
            log_stderr=True,
            return_all=True,
            stream_output=True)

    test = dbt(
        command=*****,
        task_args={"name": "*****"},
        upstream_tasks=[pull_repo])

flow.register(project_name='dbt')
a
There is an easier way to pull your debt repo when you leverage
pygit2
instead of ShellTask - this post shows a full example but here is the gist of it:
Copy code
@task(name="Clone DBT repo")
def pull_dbt_repo(repo_url: str, branch: str = None):
    pygit2.clone_repository(url=repo_url, path=DBT_PROJECT, checkout_branch=branch)
But if you want to use the ShellTask you would need to either define it at the module scope and then call it in your flow, as you did with the DbtShellTask, or add another pair of brackets:
Copy code
with Flow("dbt Data Tests") as flow:

    pull_repo = ShellTask(
            helper_script= '******',
            command= '*********',
            name= "Clone DBT",
            log_stdout=True,
            log_stderr=True,
            return_all=True,
            stream_output=True)()
e
Ah okay, do you think this is going to solve the issue? I ended up using the shell task because it was easier for me to authenticate within the VM itself instead of dealing with hiding the auth data in the code(security is very high priority). Not quite sure I understand the reason for needing another () at the end? And what would I need to define in the module scope if I don't add that?
a
to explain why you need another (): •
ShellTask()
- the first () instantiates the class •
ShellTask(init_args)(run_args)
- the next () calls the Task's .run() method
e
Got it! Makes sense! So I guess you are saying it doesn't successfully close the thread because the run() method doesn't run? Weird that it worked without it in the first place then! Thank you!
a
yup exactly!
🙌 1