https://prefect.io logo
#prefect-server
Title
# prefect-server
e

Edvard Kristiansen

05/05/2022, 8:41 AM
Hey, I recently ran into this issue:
Copy code
Failed to load and execute flow run: RuntimeError("can't start new thread")
Restarting the VM resolved the issue but no flows were able to run at all until it was restarted, all with the same or similar errors. Is there any way to fix this without restarting the VM?
a

Anna Geller

05/05/2022, 10:31 AM
It looks like you have already started more threads than your system (or Python process) can handle. Did you manually configure the number of threads to use in your flow configuration? Can you share your Flow executor definition?
e

Edvard Kristiansen

05/05/2022, 11:06 AM
I deployed Prefect with its default settings, not sure where I can see the flow configuration? I didn't run into this issue previously because I ran a vm restart once a day which I removed because I had overnight flows running. Isn't it sub optimal that the system doesn't get rid of the unused threads after a flow run? I dont actually run that many flows, its mostly one at a time and tops 10 a day.
a

Anna Geller

05/05/2022, 12:21 PM
I meant: can you share your flow code?
e

Edvard Kristiansen

05/05/2022, 3:46 PM
Ah yes, sorry! Although all of my flows regardless of which one was running failed, but this is an example.
Copy code
from prefect import Flow, task, context
from prefect.tasks.shell import ShellTask
from prefect.triggers import all_finished
from prefect.tasks.dbt.dbt import DbtShellTask

# ----------- Config --------------
target = "vm"
# ---------------------------------

dbt = DbtShellTask(
    return_all=True,
    profile_name="x",
    environment="y",
    profiles_dir=".",
    log_stdout=True,
    helper_script='****************',
    log_stderr=True,
    stream_output=True)

with Flow("dbt Data Tests") as flow:

    pull_repo = ShellTask(
            helper_script= '******',
            command= '*********',
            name= "Clone DBT",
            log_stdout=True,
            log_stderr=True,
            return_all=True,
            stream_output=True)

    test = dbt(
        command=*****,
        task_args={"name": "*****"},
        upstream_tasks=[pull_repo])

flow.register(project_name='dbt')
a

Anna Geller

05/05/2022, 5:02 PM
There is an easier way to pull your debt repo when you leverage
pygit2
instead of ShellTask - this post shows a full example but here is the gist of it:
Copy code
@task(name="Clone DBT repo")
def pull_dbt_repo(repo_url: str, branch: str = None):
    pygit2.clone_repository(url=repo_url, path=DBT_PROJECT, checkout_branch=branch)
But if you want to use the ShellTask you would need to either define it at the module scope and then call it in your flow, as you did with the DbtShellTask, or add another pair of brackets:
Copy code
with Flow("dbt Data Tests") as flow:

    pull_repo = ShellTask(
            helper_script= '******',
            command= '*********',
            name= "Clone DBT",
            log_stdout=True,
            log_stderr=True,
            return_all=True,
            stream_output=True)()
e

Edvard Kristiansen

05/06/2022, 1:13 PM
Ah okay, do you think this is going to solve the issue? I ended up using the shell task because it was easier for me to authenticate within the VM itself instead of dealing with hiding the auth data in the code(security is very high priority). Not quite sure I understand the reason for needing another () at the end? And what would I need to define in the module scope if I don't add that?
a

Anna Geller

05/06/2022, 1:29 PM
to explain why you need another (): •
ShellTask()
- the first () instantiates the class •
ShellTask(init_args)(run_args)
- the next () calls the Task's .run() method
e

Edvard Kristiansen

05/06/2022, 1:43 PM
Got it! Makes sense! So I guess you are saying it doesn't successfully close the thread because the run() method doesn't run? Weird that it worked without it in the first place then! Thank you!
a

Anna Geller

05/06/2022, 1:57 PM
yup exactly!
🙌 1
3 Views