https://prefect.io logo
#prefect-community
Title
# prefect-community
r

Rajan Subramanian

04/04/2022, 2:08 PM
Hello all, i had my prefect orion cloud running for last 4 days with no hiccups. today i logged in and i see all the deployments disappeared from the cloud. Is there a time limit for these processes? @Anna Geller
1
k

Kevin Kho

04/04/2022, 2:12 PM
There is no time limit. Could you share your deployment spec?
r

Rajan Subramanian

04/04/2022, 2:13 PM
sure,
Copy code
import os
from os.path import join

from exchange_feeds.constants import EXCHANGEPATH, PHOBOSPATH
from prefect.deployments import DeploymentSpec

path_to_pipeline = join(PHOBOSPATH, "pipelines", "feed_to_redis_pipeline.py")
path_to_file = os.path.join(EXCHANGEPATH, "feed_ingestor_to_redis.py")


DeploymentSpec(
    name="kraken_blotter_btcusd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "btcusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "BTC/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_ethusd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "ethusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "ETH/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_bnbusd",
    flow_location=path_to_pipeline,
    tags=["kraken_blotter", "bnbusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "BNB/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_xrpusd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "xrpusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "XRP/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_lunausd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "lunausd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "LUNA/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_solusd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "solusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "SOL/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_avaxusd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "avaxusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "AVAX/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_dotusd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "dotusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "DOT/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_dogeusd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "dogeusd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "DOGE/USD",
        "stream_name": "kraken-blotter",
    },
)

DeploymentSpec(
    name="kraken_blotter_adausd",
    flow_location=path_to_pipeline,
    tags=["kraken-blotter", "adausd"],
    parameters={
        "shell_task": path_to_file,
        "symbol": "ADA/USD",
        "stream_name": "kraken-blotter",
    },
)
thats one such deployment
heres my pipeline,
Copy code
import subprocess
from typing import List

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.tasks import Task

from pipelines.prefecttoolz import build_command

# shell task to run python programs


@task(
    name="execute shell command",
    description="runs each of the exchange.py file as a separate shell task",
    retries=3,
    retry_delay_seconds=10,
)
def run_streamer_in_shell(command: Task):
    subprocess.run(command, shell=True)


@flow(name="live_feeds_to_redis_pipeline", task_runner=ConcurrentTaskRunner)
def run_flow(shell_task: str, symbol: str, stream_name: str):
    cmd = build_command(shell_task, symbol=symbol, stream_name=stream_name)
    run_streamer_in_shell(command=cmd)
Copy code
@task
def build_command(path_to_file: str, **kwargs) -> Task:
    if "symbol" in kwargs and "stream_name" in kwargs:
        symbol = kwargs["symbol"]
        stream_name = kwargs["stream_name"]
        return f"""python3 {path_to_file} -t '{symbol}' -n '{stream_name}' -s"""
    return f"python3 {path_to_file}"
how i stream data...
Copy code
# -- To run the file, go to shell and type:
# -- python3 feed_ingestor_to_redis.py -t 'ethusdt' -n 'binance-blotter'
# -- To save the stream, type
# -- python3 feed_ingestor_to_redis.py -t 'ETH/USDT' -n 'kraken-L1' -s 'yes'

import argparse
import asyncio
from datetime import datetime

import uvloop
import websockets
from websockets import connect

from binance import BinanceBlotter, BinanceOrderBook
from constants import STREAM_NAMES, StreamName
from ftx import FTXBlotter, FTXOrderBook
from kraken import KrakenBlotter, KrakenOrderBook

# event policy needs to be set at top of file
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

factory = {}
factory[StreamName.BINANCE_BLOTTER.value] = BinanceBlotter
factory[StreamName.BINANCE_L1.value] = BinanceOrderBook
factory[StreamName.FTX_BLOTTER.value] = FTXBlotter
factory[StreamName.FTX_ORDERBOOK.value] = FTXOrderBook
factory[StreamName.KRAKEN_BLOTTER.value] = KrakenBlotter
factory[StreamName.KRAKEN_L1.value] = KrakenOrderBook
factory[StreamName.FTXUS_BLOTTER.value] = FTXBlotter
factory[StreamName.FTXUS_ORDERBOOK.value] = FTXOrderBook


async def stream_from_exchange():
    parser = argparse.ArgumentParser(
        prog="feed_ingestor_to_redis",
        usage="%(prog)s --symbol [options] --stream_name [options]",
        description="Streams Raw Feeds from Exchange",
        epilog="sit back and drink coffee - long running program",
    )
    parser.add_argument(
        "--ticker",
        "-t",
        action="store",
        required=True,
        type=str,
        help="ticker symbol from exchange",
    )
    parser.add_argument(
        "--stream_name",
        "-n",
        action="store",
        required=True,
        type=str,
        help="stream name in redis to store data to",
        choices=STREAM_NAMES,
    )
    parser.add_argument(
        "--save_stream",
        "-s",
        action=argparse.BooleanOptionalAction,
        default=False,
        help="to save to redis",
    )
    args = parser.parse_args()

    ticker, stream_name, save_stream = args.ticker, args.stream_name, args.save_stream
    if stream_name not in factory:
        raise KeyError("stream name not found")

    save = False if save_stream == "no" else True

    exchange = factory[stream_name]
    msg = "entered logger"
    exchg = exchange(ticker, stream_name)
    print(
        f"Connecting to {exchg.exchange}.  If an error occurs please press ctrl + c to stop this forever process"
    )
    print("erorrs are logged at exchange_feeds/exchangelogs.log")
    async for socket in connect(exchg.url, ping_interval=None):
        try:
            <http://exchg.logger.info|exchg.logger.info>(msg)
            exchg.websocket = socket
            await exchg.stream(save=save)
        except Exception as e:
            print(e)
            <http://exchg.logger.info|exchg.logger.info>(e)
            msg = f"""Socket Closed: at {datetime.now()};
            close reason: {exchg.websocket.close_reason},
                close code: {exchg.websocket.close_code}"""
            <http://exchg.logger.info|exchg.logger.info>(msg)
            <http://exchg.logger.info|exchg.logger.info>(f"error occured in main body {stream_name}-{ticker}")
            if exchg.websocket.close_code == 1013:
                # sleep for 15 minutes
                await asyncio.sleep(3600)
            else:
                await asyncio.sleep(15)
            await exchg.websocket.close()
            continue


if __name__ == "__main__":
    asyncio.run(stream_from_exchange())
k

Kevin Kho

04/04/2022, 2:17 PM
Will ask some team members what could cause this
r

Rajan Subramanian

04/04/2022, 2:18 PM
this is my screenshot
when i go to flows, i see it there which is weird that it disappeared from my main page
Copy code
ps aux | grep python | wc -l
gives me 118 python processes running
but when i check on the backend its not being streamed at all
oh, my prefect agent stopped running. wth. why would the agent just randomly stop running?
k

Kevin Kho

04/04/2022, 2:25 PM
It shouldn’t but it could die sometimes, you could use something like
supervisor
to run it as a long-running process that will start a new one if it fails.
a

Anna Geller

04/04/2022, 2:30 PM
@Rajan Subramanian I run my agent in a conda environment and with supervisor. You can create a file called
supervisord.conf
with this content:
Copy code
[unix_http_server]
file=/tmp/supervisor.sock   

[supervisord]
loglevel=info          

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket

[program:prefect-agent]
command=/Users/anna/opt/anaconda3/envs/orion39/bin/prefect agent start --api '<https://api-beta.prefect.io/api/accounts/accountId/workspaces/workspaceId>' WORK_QUEUE_ID
and then to start it, run:
Copy code
supervisord -c ./supervisord.conf
❤️ 1
k

Kevin Kho

04/04/2022, 2:33 PM
Your dashboard image filters are only looking for one day back and one day forward. Could you try removing those filters?
r

Rajan Subramanian

04/04/2022, 2:40 PM
yep i did, i redeployed the tasks, i am going to follow @Anna Geller advise and @Kevin Kho advise and add supervisor
👍 1
k

Kevin Kho

04/04/2022, 2:43 PM
Ah ok so are you good now?
r

Rajan Subramanian

04/04/2022, 2:49 PM
yep will circle back if i have any issues
2 Views