Thomas Mignon
01/18/2022, 1:42 PM(base) thomas@Ifremer ๎ฐ ~ ๎ฐ docker -v
Docker version 20.10.8, build 3967b7d
(base) thomas@Ifremer ๎ฐ ~ ๎ฐ docker-compose --version
docker-compose version 1.25.0, build unknown
I'm trying to start prefect with the command prefect server start
but i obtain this error :
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Exception caught; killing services (press ctrl-C to force)
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Traceback (most recent call last):
File "/home/thomas/miniconda3/lib/python3.8/site-packages/prefect/cli/server.py", line 623, in start
subprocess.check_call(
File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 364, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/thomas/miniconda3/bin/prefect", line 10, in <module>
sys.exit(cli())
File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/thomas/miniconda3/lib/python3.8/site-packages/prefect/cli/server.py", line 664, in start
subprocess.check_output(
File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 411, in check_output
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 512, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.
Anna Geller
01/18/2022, 1:45 PMThomas Mignon
01/18/2022, 1:47 PMPierre Monico
01/18/2022, 1:55 PMThomas Mignon
01/18/2022, 1:57 PMKevin Kho
01/18/2022, 2:29 PMThomas Mignon
01/18/2022, 2:30 PMworkspace, command, semaphore_audience_version, command_parameter
then retrieving it in the code when i'm launching the following prefect code with this command python myflow.py
:
import time
import prefect
from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor
from prefect import Flow, task, Parameter
@task
def template_runner(workspace, command, semaphore_audience_version, command_parameter):
workspaceVar: Path = Path(
os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}'))
template_runner.TemplateRunner(
command=command,
version=semaphore_audience_version,
command_parameter=command_parameter,
workspace=workspaceVar,
skip_signal=True
).run(exit=False)
with Flow("template_runner_flow") as flow:
worskpace = Parameter("worskpace", default="")
command = Parameter("command", default="")
semaphore_audience_version = Parameter("people", default="")
command_parameter = Parameter("people", default="")
# Map `say_hello` across the list of names
template_runner.map(worskpace, command, semaphore_audience_version, command_parameter)
if __name__ == "__main__":
flow.run(executor=DaskExecutor(
cluster_class=PBSCluster,
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
Kevin Kho
01/18/2022, 2:42 PMprefect server start
shows how to connect. You need to edit the config.toml
on the machine you are connecting from.
To open outside connections, you need to start it with prefect server start --expose
unmapped
.Thomas Mignon
01/18/2022, 2:48 PMunmapped
to pass multiple arg to the mapping ?Kevin Kho
01/18/2022, 2:50 PMThomas Mignon
01/18/2022, 2:50 PMpython myflow.py
? I was thinking of a syntax with something like python myflow.py workspace="myworkspace" command="mycommand"
Kevin Kho
01/18/2022, 2:51 PMprefect run --param workspace=myworkspace
I thinkThomas Mignon
01/18/2022, 2:51 PMpython /home1/datawork/semexp/workspace/common/bin/prefectflow.py --param workspace=/home1/datawork/semexp/workspace/dagit/running/ --param command=purify_raw_logs --param command_parameter=/home/datawork-semaphore-exp/spool/input/ftp/ftp2/2021 ftp2 -f access.log.20211201.gz --param version=3.4.9
When i'm printing the parameter workspace in my flow for example :
with Flow("template_runner_flow") as flow:
workspace = Parameter("workspace")
print("workspace :", workspace.__dict__)
I obtain this :
workspace : {'required': True, 'default': None, 'name': 'workspace', 'slug': 'workspace', 'logger': <Logger prefect.workspace (INFO)>, 'tags': set(), 'max_retries': 0, 'retry_delay': None, 'timeout': None, 'trigger': <function all_successful at 0x7fffeaefde50>, 'skip_on_upstream_skip': True, 'cache_for': None, 'cache_key': None, 'cache_validator': <function never_use at 0x7fffeb3eeaf0>, 'checkpoint': True, 'result': <PrefectResult: None>, 'target': None, 'task_run_name': None, 'state_handlers': [], 'auto_generated': False, 'log_stdout': False, 'nout': None}
So the param doesn't seems to be passed correctlyAnna Geller
01/20/2022, 11:00 AMprefect run --name yourflow --param name=Marvin --watch
Also, check out this recent blog post about parameters.Thomas Mignon
01/21/2022, 8:59 AMimport time
import prefect
import os
from pathlib import Path
from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor
from prefect import Flow, task, Parameter, unmapped
@task(log_stdout=True)
def template_runner(workspace, command, semaphore_audience_version, command_parameter):
print(workspace, command)
workspaceVar: Path = Path(
os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
)
template_runner.TemplateRunner(
command=command,
version=semaphore_audience_version,
command_parameter=command_parameter,
workspace=workspaceVar,
skip_signal=True
).run(exit=False)
with Flow("template_runner_flow") as flow:
workspace = Parameter("workspace")
command = Parameter("command")
semaphore_audience_version = Parameter("semaphore_audience_version")
command_parameter = Parameter("command_parameter")
template_runner(
workspace=workspace,
command=command,
semaphore_audience_version=semaphore_audience_version,
command_parameter=command_parameter
)
if __name__ == "__main__":
flow.run(
workspace="test",
executor=DaskExecutor(
cluster_class=PBSCluster,
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
The comportment i should have is that the daskExecutor should have submit to my PBSCluster a qsub with the cluster_kwargs isn't it ?
I got this error because no qsub was done so no PBS_JOBID is set as env var :
โโโ 09:06:20 | ERROR | Task 'template_runner': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home1/datawork/semexp/workspace/common/bin/prefectflow.py", line 16, in template_runner
os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/os.py", line 679, in __getitem__
raise KeyError(key) from None
KeyError: 'PBS_JOBID'
prefect run --path /home1/datawork/semexp/workspace/common/bin/prefectflow.py --param workspace="/home1/datawork/semexp/workspace/dagit/running/" --param command="purify_raw_logs" --param command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp2/2021 ftp2 -f access.log.20211201.gz" --param semaphore_audience_version="3.4.9" --watch
Anna Geller
01/21/2022, 10:38 AMfrom prefect import task, Flow
from prefect.executors import DaskExecutor
from dask_jobqueue import PBSCluster
@task
def generate_random_numbers():
return list(range(1, 200))
@task
def add_one(x):
return x + 1
@task(log_stdout=True)
def print_results(res):
print(res)
with Flow(
"mapping",
executor=DaskExecutor(
cluster_class=PBSCluster,
cluster_kwargs={
"cores": 1,
"memory": "24GB",
"queue": "sequentiel",
"walltime": "00:05:00",
"interface": "ib0",
"n_workers": 1,
},
),
) as flow:
numbers = generate_random_numbers()
result = add_one.map(numbers)
print_results(result)
Thomas Mignon
01/21/2022, 1:57 PMAnna Geller
01/21/2022, 2:05 PMThomas Mignon
01/21/2022, 2:12 PMimport time
import prefect
import os
from pathlib import Path
from semaphore_scripts import template_runner
from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor
from prefect import Flow, task, Parameter, unmapped
@task(log_stdout=True)
def template_runner_task(workspace, command, semaphore_audience_version, command_parameter):
workspaceVar: Path = Path(
#os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
os.path.join(workspace, f'{command}')
)
template_runner.TemplateRunner(
command=command,
version=semaphore_audience_version,
command_parameter=command_parameter,
workspace=workspaceVar,
skip_signal=True
).run(exit=False)
with Flow(
"template_runner_flow",
executor=DaskExecutor(
cluster_class='dask_jobqueue.PBSCluster',
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
)
) as flow:
workspace = Parameter("workspace")
command = Parameter("command")
semaphore_audience_version = Parameter("semaphore_audience_version")
command_parameter = Parameter("command_parameter")
template_runner_task(
workspace=workspace,
command=command,
semaphore_audience_version=semaphore_audience_version,
command_parameter=command_parameter
)
So my flow when looks like that, start a qsubKevin Kho
01/21/2022, 2:40 PMAn Hoang
02/03/2022, 2:56 PM