https://prefect.io logo
#prefect-community
Title
# prefect-community
r

Rajan Subramanian

03/22/2022, 5:55 PM
Hello, any possibility of adding a -r flag or --run flag when one does
Copy code
prefect deployment create deployment_name
that runs a deployment if a worker agent is already listening, if not, then its scheduled to run once upon creation of the worker queue.
a

Anna Geller

03/22/2022, 7:35 PM
hmm so currently if you do:
Copy code
prefect deployment run flow_name/deployment_name
it will trigger a run on your agent
we also have a feature that allows triggering a flow run from an already existing (i.e. already created) deployment via API call - is that what you are trying to do? if so, you can do this via
OrionClient.create_flow_run_from_deployment
or via a direct API call. If not, can you describe the problem you try to solve a bit more? What would be needed for your use case - triggering a flow run that gets executed on the agent you spun up on your EC2 instance? if so, then the CLI command above should have the same effect as it will create a flow run scheduled for now If you attach a schedule on your
DeploymentSpec
this will also trigger a flow run running on your agent, provided that the
DeploymentSpec
definition matches the work queue filter criteria
r

Rajan Subramanian

03/25/2022, 3:58 PM
@Anna Geller, thanks for your response, since the process i am running is running indefinitely, what type of schedule should i add? should it be rescheduled every 1000 days or something? and what happens when a run is scheduled, does that override the previous run?
also one more thing i am having an issue wtih, is my ssh tunnel gets disonnected and any active processes i have running in the background gets stopped. i have 2 ssh tunnels running, one where the prefect agent is listening for active deployments, and another one that has a redis-server running. my problem is after i loginto the cloud UI, and the ssh tunnel gets disconnected, i notice i have a http error on the prefect side, a 403 HTTP response. not sure whats happening here
@Anna Geller, if i have a file called deployments.py that contains all the deployments, how do i call
Copy code
OrionClient.create_flow_run_from_deployment
? right now, i am crating a hackey way of doing this by creating another python file called execute_deployments that runs prefect shell commands of creating deployments and running them.
a

Anna Geller

03/25/2022, 5:50 PM
since the process i am running is running indefinitely, what type of schedule should i add?
Is this still about the streaming use case we discussed? I still believe (as I explained in this SO answer where we discussed this originally) that you need some sort of service e.g. a python script with a while loop that indefinitely creates new flow runs. This means that this wouldn't run on schedule at all. But if you wanted to, you could also attach e.g. minutely schedule and have this sort of near real-time pipeline. No need to reschedule every 1000 days (where did you even get that number from 😄 )
each flow run is independent of each other
not sure whats happening here
me neither. hard to debug this like this remotely. Can you try Cloud 2.0 maybe https://beta.prefect.io/? this would give you UI available from the public Internet from anywhere once you attach the schedule, no need to invoke your flow run from a client, Prefect will automatically invoke the flow run based on your schedule.
...or with the while loop approach you wouldn't invoke any deployment, you would just do:
python your_never_ending_script_creating_millions_of_flow_runs.py
r

Rajan Subramanian

03/26/2022, 5:47 PM
@Anna Geller, i am using beta prefect.io for cloud 2.0. one thing is, i tested this deployment. I streamed 2 coins for two different types of data at two different exchanges. thats 8 streams of data. I deployed this using prefect, but when i the memory utilization, im surprised prefect is spinning up 108 python processes to run all this. Also, 29gb of ram is being used just to run these 8 streams. is that normal?
a

Anna Geller

03/26/2022, 8:23 PM
Hard to tell. How did you start those processes? You could write down all steps you took to spin up all services and deployments in a structured way to make it easier to follow and troubleshoot, even for yourself 🙂 this would be not only helpful to troubleshoot but also if something goes wrong with your EC2 instance, it would make it easier to recover the instance and all processes. Regarding memory consumption, maybe you can start by adding more logs to understand what part of your pipeline causes memory spikes? You can do something similar to this And btw, it looks like you've made really good progress! 👏
r

Rajan Subramanian

03/28/2022, 3:47 PM
@Anna Geller thanks for your message, yes i have made a lot of progress. But having issues with the code freezing. i have one file called pipeline, as such:
Copy code
import subprocess

from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
from prefect.tasks import Task

from pipelines.prefecttoolz import build_command


@task(
    name="execute shell command",
    description="runs a shelltask to read from redis and push to postgres and clean redis",
    retries=3,
    retry_delay_seconds=10,
)
def run_in_shell(command: Task):
    subprocess.run(command, shell=True)


@flow(name="redis_to_postgres_pipeline", task_runner=DaskTaskRunner())
def run_flow(shell_task: str):
    cmd = build_command(shell_task)
    run_in_shell(command=cmd)
Below file is the pipeline from exchanges to redis...
Copy code
import subprocess
from typing import List

from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
from prefect.tasks import Task

from pipelines.prefecttoolz import build_command

# shell task to run python programs


@task(
    name="execute shell command",
    description="runs each of the exchange.py file as a separate shell task",
    retries=3,
    retry_delay_seconds=10,
)
def run_streamer_in_shell(command: Task):
    subprocess.run(command, shell=True)


@flow(
    name="live_feeds_to_redis_pipeline",
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 1, "threads_per_worker": 1}
    ),
)
def run_flow(shell_task: str, symbol: str, stream_name: str):
    cmd = build_command(shell_task, symbol=symbol, stream_name=stream_name)
    run_streamer_in_shell(command=cmd)
I have one deployment file, as such:
Copy code
import os
from os.path import join
from typing import List

from exchange_feeds.constants import EXCHANGEPATH, PHOBOSPATH
from prefect.deployments import DeploymentSpec

path_to_pipeline = join(PHOBOSPATH, "pipelines", "feed_to_redis_pipeline.py")
path_to_file = os.path.join(EXCHANGEPATH, "feed_ingestor_to_redis.py")


DeploymentSpec(
    name="ftx_L1_btc-perp",
    flow_location=path_to_pipeline,
    tags=["ftx-L1", "btc-perp"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "BTC-PERP",
        "stream_name": "ftx-orderbook",
    },
)

DeploymentSpec(
    name="ftx_L1_btcusd",
    flow_location=path_to_pipeline,
    tags=["ftx-L1", "btcusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "BTC/USD",
        "stream_name": "ftx-orderbook",
    },
)
The deployment file has about 10 deployment specs, reprsenting 10 coins. to automate this from a shell call, i created another file called stream_deployments.py with contents as such:
Copy code
import os
import subprocess
from time import sleep
from typing import List

from exchange_feeds.constants import ACCOUNT, FLOW_NAME, WORKSPACE_NAME

ID = os.path.join(ACCOUNT, WORKSPACE_NAME)


def cloud_login():
    key = os.environ["PREFECTCLOUDKEY"]
    cmd = f"prefect cloud login --key {key} -w {ID}"
    subprocess.run(cmd, shell=True)


def create_deployments(deployment_name):
    cmd = f"prefect deployment create {deployment_name}"
    subprocess.run(cmd, shell=True)


def run_unscheduled_deployments(deployment_names: List[str]):
    for d in deployment_names:
        cmd = f"prefect deployment run {d}"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        print(result.stdout)


def start_prefect_agent(agent_name):
    cmd = f"prefect work-queue create {agent_name}"
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    uiud = result.stdout.strip("UIUD()").replace("(", "").replace(")", "")
    cmd = f"prefect agent start {uiud}"
    subprocess.run(cmd, shell=True)


coins = (
    "avaxusd",
    "bnbusd",
    "btc-perp",
    "btcusd",
    "dogeusd",
    "dotusd",
    "ethusd",
    "lunausd",
    "solusd",
    "xrpusd",
)
ftx_L1_deployment = [f"ftx_L1_{coin}" for coin in coins]
ftx_blotter_deployment = [f"ftx_blotter_{coin}" for coin in coins]


def sign_into_cloud_and_create_deployments():
    print("logging into prefect cloud")
    cloud_login()
    print("Successfully logged into prefect cloud")
    print("Creating redis to postgres deployment")
    create_deployments("redis_to_postgres_deployment.py")
    sleep(5)
    print("Creating FTX deployments")
    create_deployments("ftx/orderbook_deployment.py")
    sleep(5)
    create_deployments("ftx/blotter_deployment.py")
    sleep(5)
    create_deployments("kraken/orderbook_deployment.py")
    sleep(5)
    create_deployments("kraken/blotter_deployment.py")
    sleep(5)


if __name__ == "__main__":
    sign_into_cloud_and_create_deployments()
    activate_coin = ["ethusd", "btcusd"]
    inactivate_flows = [f"{FLOW_NAME}/ftx_L1_{coin}" for coin in activate_coin]
    run_unscheduled_deployments(inactivate_flows)
    inactivate_flows = [f"{FLOW_NAME}/ftx_blotter_{coin}" for coin in activate_coin]
    run_unscheduled_deployments(inactivate_flows)
    inactivate_flows = [f"{FLOW_NAME}/kraken_L1_{coin}" for coin in activate_coin]
    run_unscheduled_deployments(inactivate_flows)
    inactivate_flows = [f"{FLOW_NAME}/kraken_blotter_{coin}" for coin in activate_coin]
    run_unscheduled_deployments(inactivate_flows)
    start_prefect_agent("feed_to_redis_agent")
It mainly freezes due to memory issues. in the above file, i am testing
Copy code
activate_coin = ["ethusd", "btcusd"]
these two coins. and hence noticed all the memory usage. I am using prefect work-queue agent to deploy my tasks so thats the only service im using i believe.
a

Anna Geller

03/28/2022, 6:29 PM
I think the issue you see is because you are using simultaneously a
DaskTaskRunner
with a shell task. Dask workers use a different working directory
and btw, thanks so much for sharing your code! 🙏 This helps troubleshooting more than any explanation 🙂 given that your tasks are mainly using IO operations (grabbing some data from the exchange, writing this to Redis, waiting in between API responses), I think that the default ConcurrentTaskRunner is a better choice for your use case than a DaskTaskRunner. I would try switching that first before troubleshooting any further. You can benchmark the performance - I think in your use case, Dask adds more overhead than it helps, but I could be wrong
r

Rajan Subramanian

03/28/2022, 6:43 PM
@Anna Geller thanks for that recommendation, will definitely try that and circle back
👍 1
@Anna Geller, thanks for that rec. I tried it and now the number of processes, if i deploy 10 python processes, theres only 10 other prefect agents, so much better. before it was 2 and it went to 120.
also adding the default concurrent task runner has sped up the performance dramatically, thanks
a

Anna Geller

03/31/2022, 9:07 PM
fantastic! thanks for reporting back and great progress! 🙌
9 Views