https://prefect.io logo
#prefect-server
Title
# prefect-server
t

Thomas Mignon

01/18/2022, 1:42 PM
Hi guys, I've installer docker and docker-compose :
Copy code
(base)  thomas@Ifremer ๎‚ฐ ~ ๎‚ฐ docker -v   
Docker version 20.10.8, build 3967b7d
(base)  thomas@Ifremer ๎‚ฐ ~ ๎‚ฐ docker-compose --version         
docker-compose version 1.25.0, build unknown
I'm trying to start prefect with the command
prefect server start
but i obtain this error :
Copy code
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?

If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Exception caught; killing services (press ctrl-C to force)
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?

If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Traceback (most recent call last):
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/prefect/cli/server.py", line 623, in start
    subprocess.check_call(
  File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 364, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/thomas/miniconda3/bin/prefect", line 10, in <module>
    sys.exit(cli())
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/thomas/miniconda3/lib/python3.8/site-packages/prefect/cli/server.py", line 664, in start
    subprocess.check_output(
  File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 411, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/home/thomas/miniconda3/lib/python3.8/subprocess.py", line 512, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.
a

Anna Geller

01/18/2022, 1:45 PM
looks like you need to start your docker. btw can you move the code blocks into the main thread to keep the channel cleaner?
t

Thomas Mignon

01/18/2022, 1:47 PM
Thanks for the fast reply I have something particular to do in order to start it ? They are not in the main thread already ?
t

Thomas Mignon

01/18/2022, 1:57 PM
Hi Pierre, thanks a lot, it seems to work ! ๐Ÿ™‚
๐Ÿฆœ 2
I have another question : Is it possible to have the interface on my PC then acceed in ssh to the HPC from the UI ? :)
k

Kevin Kho

01/18/2022, 2:29 PM
If you mean connecting to the server from a remote machine through the UI, yes you can. You just need to point your UI to the GraphQL API
t

Thomas Mignon

01/18/2022, 2:30 PM
Have you a link in the docs who treat of this ? ๐Ÿ™‚
I have another more question ๐Ÿ™ˆ If i want to launch a flow without the UI ( without filling the inputs forms ), how can i pass the parameters
workspace, command, semaphore_audience_version, command_parameter
then retrieving it in the code when i'm launching the following prefect code with this command
python myflow.py
:
Copy code
import time
import prefect

from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor

from prefect import Flow, task, Parameter


@task
def template_runner(workspace, command, semaphore_audience_version, command_parameter):
   workspaceVar: Path = Path(
            os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}'))

        template_runner.TemplateRunner(
            command=command,
            version=semaphore_audience_version,
            command_parameter=command_parameter,
            workspace=workspaceVar,
            skip_signal=True
        ).run(exit=False)



with Flow("template_runner_flow") as flow:
    worskpace = Parameter("worskpace", default="")
    command = Parameter("command", default="")
    semaphore_audience_version = Parameter("people", default="")
    command_parameter = Parameter("people", default="")

    # Map `say_hello` across the list of names
    template_runner.map(worskpace, command, semaphore_audience_version, command_parameter)

if __name__ == "__main__":
  flow.run(executor=DaskExecutor(
    cluster_class=PBSCluster,
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
      }
  ))
k

Kevin Kho

01/18/2022, 2:42 PM
For the connection, check this blog. The last section after
prefect server start
shows how to connect. You need to edit the
config.toml
on the machine you are connecting from. To open outside connections, you need to start it with
prefect server start --expose
If you are mapping and something stays the same throughout the mapped calls, you need to wrap it with
unmapped
.
As long as downstream tasks use the Parameter, populating them in the UI will populate them during runtime
t

Thomas Mignon

01/18/2022, 2:48 PM
Thanks a lot for the doc about the connection, i will check this out just after i've resolved my parameters issue ๐Ÿ™‚ If i'm understanding correctly, i have to use
unmapped
to pass multiple arg to the mapping ?
k

Kevin Kho

01/18/2022, 2:50 PM
Check this for unmapped
t

Thomas Mignon

01/18/2022, 2:50 PM
Then how can i pass the args on the start via
python myflow.py
? I was thinking of a syntax with something like
python myflow.py workspace="myworkspace" command="mycommand"
k

Kevin Kho

01/18/2022, 2:51 PM
prefect run --param workspace=myworkspace
I think
More info here
t

Thomas Mignon

01/18/2022, 2:51 PM
Ok thank you a lot for all the info, i will try that ๐Ÿ˜›
Hi @Kevin Kho, I'm struggling retrieving my parameters passed on the start command who is
python /home1/datawork/semexp/workspace/common/bin/prefectflow.py --param workspace=/home1/datawork/semexp/workspace/dagit/running/ --param command=purify_raw_logs --param command_parameter=/home/datawork-semaphore-exp/spool/input/ftp/ftp2/2021 ftp2 -f access.log.20211201.gz --param version=3.4.9
When i'm printing the parameter workspace in my flow for example :
Copy code
with Flow("template_runner_flow") as flow:
    workspace = Parameter("workspace")
    print("workspace :", workspace.__dict__)
I obtain this :
Copy code
workspace : {'required': True, 'default': None, 'name': 'workspace', 'slug': 'workspace', 'logger': <Logger prefect.workspace (INFO)>, 'tags': set(), 'max_retries': 0, 'retry_delay': None, 'timeout': None, 'trigger': <function all_successful at 0x7fffeaefde50>, 'skip_on_upstream_skip': True, 'cache_for': None, 'cache_key': None, 'cache_validator': <function never_use at 0x7fffeb3eeaf0>, 'checkpoint': True, 'result': <PrefectResult: None>, 'target': None, 'task_run_name': None, 'state_handlers': [], 'auto_generated': False, 'log_stdout': False, 'nout': None}
So the param doesn't seems to be passed correctly
a

Anna Geller

01/20/2022, 11:00 AM
You donโ€™t need to build your own CLI arg parser to do that, because Prefect CLI already provides that functionality. You can use:
Copy code
prefect run --name yourflow --param name=Marvin --watch
Also, check out this recent blog post about parameters.
t

Thomas Mignon

01/21/2022, 8:59 AM
Thanks a lot @Anna Geller for the blog post, that was exactly what i was searching for !
๐Ÿ‘ 1
Do you know DaskExecutor a little ? Here's how my flow looks like now :
Copy code
import time
import prefect
import os
from pathlib import Path

from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor

from prefect import Flow, task, Parameter, unmapped


@task(log_stdout=True)
def template_runner(workspace, command, semaphore_audience_version, command_parameter):
    print(workspace, command)
    workspaceVar: Path = Path(
        os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
    )

    template_runner.TemplateRunner(
        command=command,
        version=semaphore_audience_version,
        command_parameter=command_parameter,
        workspace=workspaceVar,
        skip_signal=True
    ).run(exit=False)



with Flow("template_runner_flow") as flow:
    workspace = Parameter("workspace")
    command = Parameter("command")
    semaphore_audience_version = Parameter("semaphore_audience_version")
    command_parameter = Parameter("command_parameter")

    template_runner(
        workspace=workspace,
        command=command,
        semaphore_audience_version=semaphore_audience_version,
        command_parameter=command_parameter
    )

if __name__ == "__main__":
  flow.run(
    workspace="test",
    executor=DaskExecutor(
    cluster_class=PBSCluster,
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
      }
  ))
The comportment i should have is that the daskExecutor should have submit to my PBSCluster a qsub with the cluster_kwargs isn't it ? I got this error because no qsub was done so no PBS_JOBID is set as env var :
Copy code
โ””โ”€โ”€ 09:06:20 | ERROR   | Task 'template_runner': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home1/datawork/semexp/workspace/common/bin/prefectflow.py", line 16, in template_runner
    os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/os.py", line 679, in __getitem__
    raise KeyError(key) from None
KeyError: 'PBS_JOBID'
I launch my flow with prefect CLI with :
Copy code
prefect run --path /home1/datawork/semexp/workspace/common/bin/prefectflow.py --param workspace="/home1/datawork/semexp/workspace/dagit/running/" --param command="purify_raw_logs" --param command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp2/2021 ftp2 -f access.log.20211201.gz" --param semaphore_audience_version="3.4.9" --watch
a

Anna Geller

01/21/2022, 10:38 AM
@Thomas Mignon your flow definition looks fine. I usually see people using DaskExecutor when they leverage mapping, i.e. when they parallelize the execution of a function over a list of various inputs. But even without mapping the executor should submit the task to he cluster. Itโ€™s hard to give any suggestions since I canโ€™t reproduce this myself locally. This thread shares how one user has done it with HPC cluster - perhaps it gives you some info? A good way to debug this would be to start with something simpler. Can you try running this simple flow on your PBS cluster? Does this work?
Copy code
from prefect import task, Flow
from prefect.executors import DaskExecutor
from dask_jobqueue import PBSCluster


@task
def generate_random_numbers():
    return list(range(1, 200))


@task
def add_one(x):
    return x + 1


@task(log_stdout=True)
def print_results(res):
    print(res)


with Flow(
    "mapping",
    executor=DaskExecutor(
        cluster_class=PBSCluster,
        cluster_kwargs={
            "cores": 1,
            "memory": "24GB",
            "queue": "sequentiel",
            "walltime": "00:05:00",
            "interface": "ib0",
            "n_workers": 1,
        },
    ),
) as flow:
    numbers = generate_random_numbers()
    result = add_one.map(numbers)
    print_results(result)
t

Thomas Mignon

01/21/2022, 1:57 PM
Hi @Anna Geller, The little code you gave me has worked, a qsub was created on my PBS Cluster ๐Ÿ™‚
a

Anna Geller

01/21/2022, 2:05 PM
thatโ€™s amazing! I guess this might be then an issue with the template runner class. Perhaps some statefulness issue bound to using classes in parallel processing
t

Thomas Mignon

01/21/2022, 2:12 PM
I've commented the line with template_runner and the problem remain the same
But if i use the way you write the flow it's good ๐Ÿ™‚
Copy code
import time
import prefect
import os
from pathlib import Path
from semaphore_scripts import template_runner

from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor

from prefect import Flow, task, Parameter, unmapped


@task(log_stdout=True)
def template_runner_task(workspace, command, semaphore_audience_version, command_parameter):
    workspaceVar: Path = Path(
        #os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
        os.path.join(workspace, f'{command}')

    )

    template_runner.TemplateRunner(
        command=command,
        version=semaphore_audience_version,
        command_parameter=command_parameter,
        workspace=workspaceVar,
        skip_signal=True
    ).run(exit=False)



with Flow(
    "template_runner_flow",
        executor=DaskExecutor(
            cluster_class='dask_jobqueue.PBSCluster',
            cluster_kwargs={
                "cores": 1,
                "memory": '24GB',
                "queue": 'sequentiel',
                "walltime": '00:05:00',
                "interface": 'ib0',
                "n_workers": 1
                }
            )
) as flow:
    workspace = Parameter("workspace")
    command = Parameter("command")
    semaphore_audience_version = Parameter("semaphore_audience_version")
    command_parameter = Parameter("command_parameter")

    template_runner_task(
        workspace=workspace,
        command=command,
        semaphore_audience_version=semaphore_audience_version,
        command_parameter=command_parameter
    )
So my flow when looks like that, start a qsub
When the executor is passed as args on the flow
So this is a great news it works ๐Ÿ˜›
Thank you a lot @Anna Geller!
๐Ÿ™Œ 1
k

Kevin Kho

01/21/2022, 2:40 PM
@An Hoang, you were asking who else is on HPC
๐Ÿ‘ 1
a

An Hoang

02/03/2022, 2:56 PM
@Thomas Mignon I'm making a list of people who are on HPC to eventually write a blogpost about HPC + Prefect. It seems like you are on PBS. I'm on LSF transitioning to SLURM. Can I DM you?
upvote 1
3 Views