Rajan Subramanian
03/22/2022, 5:55 PMprefect deployment create deployment_name
that runs a deployment if a worker agent is already listening, if not, then its scheduled to run once upon creation of the worker queue.Anna Geller
03/22/2022, 7:35 PMprefect deployment run flow_name/deployment_name
it will trigger a run on your agentOrionClient.create_flow_run_from_deployment
or via a direct API call.
If not, can you describe the problem you try to solve a bit more? What would be needed for your use case - triggering a flow run that gets executed on the agent you spun up on your EC2 instance? if so, then the CLI command above should have the same effect as it will create a flow run scheduled for now
If you attach a schedule on your DeploymentSpec
this will also trigger a flow run running on your agent, provided that the DeploymentSpec
definition matches the work queue filter criteriaRajan Subramanian
03/25/2022, 3:58 PMOrionClient.create_flow_run_from_deployment
?
right now, i am crating a hackey way of doing this by creating another python file called execute_deployments that runs prefect shell commands of creating deployments and running them.Anna Geller
03/25/2022, 5:50 PMsince the process i am running is running indefinitely, what type of schedule should i add?Is this still about the streaming use case we discussed? I still believe (as I explained in this SO answer where we discussed this originally) that you need some sort of service e.g. a python script with a while loop that indefinitely creates new flow runs. This means that this wouldn't run on schedule at all. But if you wanted to, you could also attach e.g. minutely schedule and have this sort of near real-time pipeline. No need to reschedule every 1000 days (where did you even get that number from 😄 )
not sure whats happening hereme neither. hard to debug this like this remotely. Can you try Cloud 2.0 maybe https://beta.prefect.io/? this would give you UI available from the public Internet from anywhere once you attach the schedule, no need to invoke your flow run from a client, Prefect will automatically invoke the flow run based on your schedule.
python your_never_ending_script_creating_millions_of_flow_runs.py
Rajan Subramanian
03/26/2022, 5:47 PMAnna Geller
03/26/2022, 8:23 PMRajan Subramanian
03/28/2022, 3:47 PMimport subprocess
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
from prefect.tasks import Task
from pipelines.prefecttoolz import build_command
@task(
name="execute shell command",
description="runs a shelltask to read from redis and push to postgres and clean redis",
retries=3,
retry_delay_seconds=10,
)
def run_in_shell(command: Task):
subprocess.run(command, shell=True)
@flow(name="redis_to_postgres_pipeline", task_runner=DaskTaskRunner())
def run_flow(shell_task: str):
cmd = build_command(shell_task)
run_in_shell(command=cmd)
Below file is the pipeline from exchanges to redis...
import subprocess
from typing import List
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
from prefect.tasks import Task
from pipelines.prefecttoolz import build_command
# shell task to run python programs
@task(
name="execute shell command",
description="runs each of the exchange.py file as a separate shell task",
retries=3,
retry_delay_seconds=10,
)
def run_streamer_in_shell(command: Task):
subprocess.run(command, shell=True)
@flow(
name="live_feeds_to_redis_pipeline",
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "threads_per_worker": 1}
),
)
def run_flow(shell_task: str, symbol: str, stream_name: str):
cmd = build_command(shell_task, symbol=symbol, stream_name=stream_name)
run_streamer_in_shell(command=cmd)
I have one deployment file, as such:
import os
from os.path import join
from typing import List
from exchange_feeds.constants import EXCHANGEPATH, PHOBOSPATH
from prefect.deployments import DeploymentSpec
path_to_pipeline = join(PHOBOSPATH, "pipelines", "feed_to_redis_pipeline.py")
path_to_file = os.path.join(EXCHANGEPATH, "feed_ingestor_to_redis.py")
DeploymentSpec(
name="ftx_L1_btc-perp",
flow_location=path_to_pipeline,
tags=["ftx-L1", "btc-perp"],
parameters={
"shell_task": path_to_file,
"symbol": "BTC-PERP",
"stream_name": "ftx-orderbook",
},
)
DeploymentSpec(
name="ftx_L1_btcusd",
flow_location=path_to_pipeline,
tags=["ftx-L1", "btcusd"],
parameters={
"shell_task": path_to_file,
"symbol": "BTC/USD",
"stream_name": "ftx-orderbook",
},
)
import os
import subprocess
from time import sleep
from typing import List
from exchange_feeds.constants import ACCOUNT, FLOW_NAME, WORKSPACE_NAME
ID = os.path.join(ACCOUNT, WORKSPACE_NAME)
def cloud_login():
key = os.environ["PREFECTCLOUDKEY"]
cmd = f"prefect cloud login --key {key} -w {ID}"
subprocess.run(cmd, shell=True)
def create_deployments(deployment_name):
cmd = f"prefect deployment create {deployment_name}"
subprocess.run(cmd, shell=True)
def run_unscheduled_deployments(deployment_names: List[str]):
for d in deployment_names:
cmd = f"prefect deployment run {d}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
def start_prefect_agent(agent_name):
cmd = f"prefect work-queue create {agent_name}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
uiud = result.stdout.strip("UIUD()").replace("(", "").replace(")", "")
cmd = f"prefect agent start {uiud}"
subprocess.run(cmd, shell=True)
coins = (
"avaxusd",
"bnbusd",
"btc-perp",
"btcusd",
"dogeusd",
"dotusd",
"ethusd",
"lunausd",
"solusd",
"xrpusd",
)
ftx_L1_deployment = [f"ftx_L1_{coin}" for coin in coins]
ftx_blotter_deployment = [f"ftx_blotter_{coin}" for coin in coins]
def sign_into_cloud_and_create_deployments():
print("logging into prefect cloud")
cloud_login()
print("Successfully logged into prefect cloud")
print("Creating redis to postgres deployment")
create_deployments("redis_to_postgres_deployment.py")
sleep(5)
print("Creating FTX deployments")
create_deployments("ftx/orderbook_deployment.py")
sleep(5)
create_deployments("ftx/blotter_deployment.py")
sleep(5)
create_deployments("kraken/orderbook_deployment.py")
sleep(5)
create_deployments("kraken/blotter_deployment.py")
sleep(5)
if __name__ == "__main__":
sign_into_cloud_and_create_deployments()
activate_coin = ["ethusd", "btcusd"]
inactivate_flows = [f"{FLOW_NAME}/ftx_L1_{coin}" for coin in activate_coin]
run_unscheduled_deployments(inactivate_flows)
inactivate_flows = [f"{FLOW_NAME}/ftx_blotter_{coin}" for coin in activate_coin]
run_unscheduled_deployments(inactivate_flows)
inactivate_flows = [f"{FLOW_NAME}/kraken_L1_{coin}" for coin in activate_coin]
run_unscheduled_deployments(inactivate_flows)
inactivate_flows = [f"{FLOW_NAME}/kraken_blotter_{coin}" for coin in activate_coin]
run_unscheduled_deployments(inactivate_flows)
start_prefect_agent("feed_to_redis_agent")
activate_coin = ["ethusd", "btcusd"]
these two coins. and hence noticed all the memory usage.
I am using prefect work-queue agent to deploy my tasks so thats the only service im using i believe.Anna Geller
03/28/2022, 6:29 PMDaskTaskRunner
with a shell task. Dask workers use a different working directoryRajan Subramanian
03/28/2022, 6:43 PMAnna Geller
03/31/2022, 9:07 PM