Another question, but big issue for us here. Sorry...
# prefect-server
a
Another question, but big issue for us here. Sorry folks. We have deployed Prefect Core on an on-premise virtual machine with 4 CPU and 16 Go of RAM. Some flows regularly fail and are retried. Right now we have 10 flows retrying, and they are overloading the CPUs... Those retrying flows slow down the subsequent ones, that get slower and slower, until no flow can run anymore and the whole VM is virtually stopped. The tasks executed by the flows are not memory-intensive and, when no error is raised, they run smoothly in several seconds. But here, we are facing some critical error problems... One solution would be to reboot the whole application regularly, but it was something advised to do with Airflow and its scheduler issues, and one of the reason we switched from Airflow to Prefect. I have also optimized for the number of retries and the retry_timedelta, but that is not an optimal solution. You can find attached a screenshot of my htop command. Any thoughts on how to solve the issue? Btw we are using Prefect for launching an entirely-automated renewable electricity trading and, except for these bugs that I'm sure we can solve, we are very happy with the solution and would happily collaborate for writing blog posts showing our use-case of your solution or anything! Best, and thanks in advance!
k
I am confused how the Flow is retrying. Is it Lazarus trying to spin it up again?
And of course, always happy to coordinate content
1. How did you add a retry time_delta on a Flow?
a
Thanks for your answer Kevin. Actually, let me correct myself. The flows are not retried per se, but some task instances of some flow are retried because they have failed. More precisely, they are retried with these kwargs:
Is this clearer?
k
Ah ok I think I understand. What executor are you using?
a
We use a local agent launched with supervisord. I think i have read it may be not properly suited for production, so maybe another agent would be better ?
k
I am a bit confused because you seem to be saying that the task retries are spinning up new processes on the machine? Does your task logic itself do that? Because a task retry on LocalAgent should not cause that
a
I am yet to be quite experienced with Prefect, but I see there are many processes associated with Prefect on my htop command. Some of these processes are very CPU-demanding, as we can see on my previous screenshot. Currently I have 6 processes running, and my CPU is 100-% used. When I spin up a run of one of these flows, it executes very rapidly (a few seconds), so I don't get why it is so CPU intensive right now. I have not tweaked my task in any away: these flows are mostly extract -> transform -> test in one task and a load operation in another task. Both of these tasks should be retried on failure
I have something like 40 processes associated with a "prefect execute flow" command, a hundred (!) processes with "/usr/bin/containerd-shim-runc-v2 -namespace moby -id ..." command, many docker proxy and graphql-engine (20 to 30 each) processes ... And as screenshoted earlier, some of the processes "prefect execute flow" are very cpu-intensive, slowing all operations on the machine
And btw, I also have 560 upcoming runs, so maybe that's why...
thanks a lot!
To complete this description, when I restart my application (custom script stopping server, killing residual python processes and so on) and restart it, I only have a few processes associated with "prefect execute flow" commands, so there seems to be some kind of processes accumulation
k
The upcoming runs shouldn’t affect anything. So we don’t load balance/monitor hardware so all runs that are scheduled will just be run. Are you saying you have more processes that Flows running when you go to the UI?
a
Yes definitely, many more
I'd say approximately 10 times more processes than Flows
could that be several agents running concurrently ?
k
That could be but it’s not useful to have more than 1 LocalAgent. Do you have multiple agents on?
a
I do no think so, but I am not used to supervisord and was only able to make it working. The Prefect dashboard mentions only one Local Agent, but could there be several ones under the hood? I had a day off on Friday and let the infrastructure running for a few days without redeploying the app, and the VM CPUs are overloaded. Many python processes running, and several ones with lengthy duration (more than 6 hours) and using a lot of the CPUs power (output in htop command attached). However, my Prefect dashboard says a single Flow is currently running, and has been for 15 minutes. If you had any insights about this, I would be really glad! Thanks
k
There is nothing like an agent under and this is not normal. Are you prevented from trying Prefect Cloud?
a
Unfortunately yes. If the decision was in my hand, I would definitely go for it given the complexity and costs of maintaining the app on our on-premise infra, but it's not the case. Our IT and security department is against the use of cloud technologies, so I'm trying to make the best of what we have!
k
Ah I see. I really don’t know what is causing these processes and it’s not normal. You just do
prefect server start
do deploy server?
a
A little bit more complex than that for the server. Here is my entire deploy script:
Copy code
# To launch script printing logs to the console and saving them to a deploy_app_logs.txt file as well:
# source deploy_app.sh 2>&1 | ts | tee -a logs/deploy_app.$(date +'%Y-%m-%dT%H:%M:%S').log

# Automatic deployment

# Ignore logging of sudo commands
export HISTIGNORE='*sudo -S*'

### BACKEND ###
conda activate blx-vpp_backend
# Stop prefect then kill processes
prefect server stop
echo "y" | docker system prune

# Kill potential processes in the background
conda deactivate
echo <password> | sudo -S pkill python
sudo pkill -9 supervisord
sudo kill -9 $(pgrep geckodriver)
sudo kill -9 $(pgrep python)
sudo kill -9 $(pgrep prefect)
sudo killall -9 firefox-esr
sudo unlink /tmp/supervisor.sock

conda deactivate
# Delete old github repo
echo <password> | sudo -S rm -r ~/blx-vpp_backend

# Clone git repo after ssh key creation
cd ~
git clone --branch master <git-link>

# Reinstall venv
conda activate blx-vpp_backend
~/miniconda3/envs/blx-vpp_backend/bin/pip3 install -r blx-vpp_backend/requirements.txt --use-deprecated=legacy-resolver
conda deactivate

# Install geckodriver
cd ~/blx-vpp_backend/drivers
wget <https://github.com/mozilla/geckodriver/releases/download/v0.29.0/geckodriver-v0.29.0-linux64.tar.gz>
tar -xvzf geckodriver-v0.29.0-linux64.tar.gz
sudo chmod +x geckodriver
rm geckodriver-v0.29.0-linux64.tar.gz
sudo chmod -R 777 ~/blx-vpp_backend/drivers

# Launch backend
cd ~
conda activate blx-vpp_backend
prefect server start --expose --use-volume >> logs/prefect_server.$(date +'%Y-%m-%dT%H:%M:%S').log 2>&1 &
sleep 20 # To let the server start
cd blx-vpp_backend
supervisord
python main.py &
conda deactivate
k
Ok i’m not super sure but it might be
supervisord
creating these multiple processes? What is in the
main.yp
?
a
My
supervisord
is super basic, I almost did not modify the basic one generated by the Prefect command:
Copy code
; Supervisor config file for running Prefect Agents.
;
; To run for the first time:
; Save this file with the name `supervisord.conf` and run:
; supervisord # will start all programs
; supervisorctl # will open an interactive prompt for inspection
;
; If you need to edit this file and restart:
; supervisorctl reload # will reload config and restart programs
;
; For more information on configuring supervisor, please see:
; <http://supervisord.org/configuration.html>
;
; To configure supervisor logging, please see:
; <http://supervisord.org/logging.html>
;
; To inspect the status of a given program,
; use the interactive supervisorctl command:
; supervisorctl
; fg prefect-agent
;
; To stop all running programs, run:
; supervisorctl stop all
;
[unix_http_server]
file=/tmp/supervisor.sock   ; the path to the socket file

[supervisord]
loglevel=debug               ; log level; default info; others: debug,warn,trace

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket

[program:prefect-agent]
command=prefect agent local start
redirect_stderr=true
stdout_logfile=%(ENV_HOME)s/logs/supervisord.log

; you can add as many programs as you'd like
; including multiple Prefect Agents
; [program:foo-agent]
; command=prefect agent start local -l foo
k
Really hard to say what is going on. I would honestly try decoupling the agent and server to see if the issue persists with that setup
Will be slow to respond today as it’s a US holiday but will check this throughout the day
a
My main.py imports several modules like this:
from flows import etl_inputs
etl_inputs
looks like this:
Copy code
with Flow('da_prices', schedule=schedules_prefect['da_prices']) as da_prices:
    sql_da_prices = workflows_wrappers.da_prices_(
        ftp_1_credentials, ftp_2_credentials)
    workflows_load.load_batch(
        dict_tn['table_name_da_prices'], sql_da_prices, 'Day-Ahead prices') 


with Flow('id_prices', schedule=schedules_prefect['id_prices']) as id_prices:
    sql_id_prices = workflows_wrappers.id_prices_(
        ftp_1_credentials, ftp_2_credentials)
    workflows_load.load_batch(
        dict_tn['table_name_id_prices'], sql_id_prices, 'Intraday prices') 


flows_dict = {'da_prices': da_prices, 'id_prices': id_prices}

utilities.register_flows(flows_dict, 'etl_inputs')
And
utilities.register_flows
(may be the issue) is as such:
Copy code
from prefect import Client

def register_flows(flows_dict, project_name):
    """
    Registers each flow in flows_dict (key: name of the flow, value:
    prefect.Flow) in a Prefect project project_name.

    Parameters
    ----------
    flows_dict : dict
        dict of Prefect Flows.
    project_name : str
        Name of the project to register Flows into.

    Returns
    -------
    None.

    """
    client = Client()
    client.create_project(project_name=project_name)
    for k, flow in flows_dict.items():
        flow.register(project_name=project_name)
So you mean deploying the server in a first VM and the agent in another one? Please do enjoy your holiday, I'm based in France so it's almost the end of the working day here!
k
Server with just
prefect server start --expose
and agent as a separate process or yes on another VM and actually that would let us determine which one is causing the extra processes right?
a
Also @Alexis Lucido, instead of this custom register_flows module, do you know that you can actually just use the CLI to bulk-register multiple flows to the same project? It may be easier and less code to maintain:
Copy code
prefect register --project XXX -p path/to/flows/
You can also try using docker-compose as shown here:
Copy code
prefect server config --expose > docker-compose.yml
docker-compose up -d
You can customize the docker-compose file as you wish to have everything in a single script. You can also check out this user contributed docker-compose setup that may make it easier to run your Server on a VM and have it all run as a script: https://github.com/flavienbwk/prefect-docker-compose
a
Thanks Kevin, thanks Anna. Many things on my plate today but I am going to check that out and come back to you!
Hey Kevin, hey Anna. Currently dockerizing our app, hoping it will help with those processes, but I am fairly new to Docker so I need a bit of time to make the appropriate changes... As I am going with Docker, do you think using a Docker agent rather than a Local one would be better suited for a production environment? Thanks a lot, as always!
a
There is no right or wrong approach, it totally depends on your use case and Docker sometimes may overcomplicate things. On the one hand, you can start a local agent e.g. in a Conda virtual environment to separate Python dependencies which can make things easy to get started and may work well, especially if your various projects use the same packages. Also, Orion (Prefect 2.0) supports running flows in a separate virtual environment, so you may even have a separate virtual environment for each flow. On the other hand, Prefect makes it really easy to dockerize your flows using Docker storage - basically, you don't have to build the Dockerfile yourself - Prefect will do that for you and will automatically build and optionally also push the images to some Docker registry. If you want to see some examples, check out this repo with flows that have "docker" in its name. And also here are the docs for Docker storage: https://docs.prefect.io/api/latest/storage.html#docker
a
Thanks a lot Anna, all my flows use the same conda env so far, but further thinking is definitely needed. I'll get back to you soon!
👍 1
Hello @Kevin Kho and @Anna Geller. I have been able to use docker-compose to run server, agent and client. I am just one step away to make it work as my Flows are registered but not executed. It is a label issue. I am able to configure agent label, but am unable to set a custom Flow label or to prevent the use of a default label on creation. Here is the function I am (still) using for registering the Flows:
def register_flows(flows_dict, project_name):
"""
Registers each flow in flows_dict (key: name of the flow, value:
prefect.Flow) in a Prefect project project_name.
Parameters
----------
flows_dict : dict
dict of Prefect Flows.
project_name : str
Name of the project to register Flows into.
Returns
-------
None.
"""
client = Client()
try:
client.create_project(project_name=project_name)
except:
logger.warning(
"Error during project creation. It may already have been created.")
for k, flow in flows_dict.items():
flow.register(project_name=project_name, add_default_labels=False)
So I believe the flows should not be associated with any label. However, it seems a random label is given to the Flow at registration (see the attached picture). Here is how I define my flow before registering it:
Copy code
with Flow('id_trades_live', schedule=schedules_prefect[
        'id_trades_live']) as id_trades_live:
    pass
Here is the docker-compose for launching the agent:
Copy code
version: "3.7"

services:

    agent:
        build: 
            dockerfile: docker/agent/Dockerfile
            context: ./
        command: bash -c "prefect agent local start --name $$(uuid) --no-hostname-label --label app"
        volumes:
            - /srv/docker/prefect/flows:/root/.prefect/flows
            - type: bind
              source: ./docker/agent/config.toml
              target: /root/.prefect/config.toml
              read_only: true
So there is a label for the agent, and I understand a Flow with no label can only be run by an agent with no label. But the problem lies in the default label attributed to the Flow. To recap I have server in one container, agent in another container and a client registering the flows in one final container. Any idea how to solve this? Thank you very much!
k
You can do
flow.storage.add_default_labels=False
or
Flow(…,storage=SomeStorage(…, add_default_labels=False)
a
Yep I have been trying those two methods as well and it did not work
The Flows were still registered with a default label
k
Can I see the Flow definition? This is the only place to turn it off and this should work
a
For instance:
with Flow('ms_det_fc', schedule=schedules_prefect['ms_fc']) as ms_det_fc:
ms_det_fc.storage = Local(add_default_labels=False)
ftd_ms_det_fc = <http://workflows_wrappers.ms|workflows_wrappers.ms>_fc(
'foo', False, ms_ftp_credentials, credentials)
workflows_load.load_batch(dict_tn['table_name_ms'], ftd_ms_det_fc,
'Deterministic forecasts')
Then the flow is registered
k
I think you need it outside the Flow block:
Copy code
with Flow(...) as ms_det_fc:
    ...

ms_det_fc.storage = Local(add_default_labels=False)
a
Nothing changed, still a default name used. Btw my client and agent are run on separate containers, so could I still use Local storage?
k
Probably not unless you use a volume mount that gives it access. But the default is local storage so if you don’t define any, it will just use local. Also moving away from local like Github/S3 don’t attach default labels also so you kinda of solve that too
a
Sorry for my late answer. What I've done is use the same Dockerfile, with same dependencies and code for both registering the Flows in the client in a first container, and executing them through a local agent launched with supervisord in another one. My Flows are using Local Storage. I've finally managed to get all the services (server, agent, client) launched through a single docker-compose file and running on my WSL, with a relatively small image for both the client and the agent (1.5 Go, from more than 6 Go at the beginning). It has been quite some work given I was a complete newbie to Docker, but at least my environments and install and deploy scripts are much, much cleaner and robust. I felt like the GitHub repo (https://github.com/flavienbwk/prefect-docker-compose) for packaging the app was kinda outdated as I had to make several changes, but as I am still new to the tools I would only advise to take these instructions with a pinch of salt. Finally, I am soon going to deploy the Docker app on a test env (not my dev one) and hope that will circumvent these retrying Flows overloading the CPUs... In any case, I am sure this work will prove useful for the future and for easing the CI/CD operations in our company. I will get back to you on this thread, in case it could help someone in the community. Thanks @Kevin Kho and @Anna Geller!
k
Sounds good! A couple of people use that repo I would even encourage a PR into it if you had to make changed for it to work