Edvard Kristiansen
05/05/2022, 8:41 AMFailed to load and execute flow run: RuntimeError("can't start new thread")
Restarting the VM resolved the issue but no flows were able to run at all until it was restarted, all with the same or similar errors. Is there any way to fix this without restarting the VM?Anna Geller
Edvard Kristiansen
05/05/2022, 11:06 AMAnna Geller
Edvard Kristiansen
05/05/2022, 3:46 PMfrom prefect import Flow, task, context
from prefect.tasks.shell import ShellTask
from prefect.triggers import all_finished
from prefect.tasks.dbt.dbt import DbtShellTask
# ----------- Config --------------
target = "vm"
# ---------------------------------
dbt = DbtShellTask(
return_all=True,
profile_name="x",
environment="y",
profiles_dir=".",
log_stdout=True,
helper_script='****************',
log_stderr=True,
stream_output=True)
with Flow("dbt Data Tests") as flow:
pull_repo = ShellTask(
helper_script= '******',
command= '*********',
name= "Clone DBT",
log_stdout=True,
log_stderr=True,
return_all=True,
stream_output=True)
test = dbt(
command=*****,
task_args={"name": "*****"},
upstream_tasks=[pull_repo])
flow.register(project_name='dbt')
Anna Geller
pygit2
instead of ShellTask - this post shows a full example but here is the gist of it:
@task(name="Clone DBT repo")
def pull_dbt_repo(repo_url: str, branch: str = None):
pygit2.clone_repository(url=repo_url, path=DBT_PROJECT, checkout_branch=branch)
But if you want to use the ShellTask you would need to either define it at the module scope and then call it in your flow, as you did with the DbtShellTask, or add another pair of brackets:
with Flow("dbt Data Tests") as flow:
pull_repo = ShellTask(
helper_script= '******',
command= '*********',
name= "Clone DBT",
log_stdout=True,
log_stderr=True,
return_all=True,
stream_output=True)()
Edvard Kristiansen
05/06/2022, 1:13 PMAnna Geller
ShellTask()
- the first () instantiates the class
• ShellTask(init_args)(run_args)
- the next () calls the Task's .run() methodEdvard Kristiansen
05/06/2022, 1:43 PMAnna Geller