Igor
09/27/2024, 6:27 PMMarvin
09/27/2024, 6:28 PMFlowconcurrencypython
from prefect import flow, task, concurrency
@task
def my_task():
    pass
@flow
def my_flow():
    with concurrency("my_flow_limit", 1):
        my_task()
if __name__ == "__main__":
    my_flow()my_flow"my_flow_limit"python
from prefect import flow, task
from prefect.concurrency import concurrency
@task
def my_task():
    pass
@flow
def my_flow():
    with concurrency("my_flow_limit", 1):
        my_task()
if __name__ == "__main__":
    my_flow()my_flow"my_flow_limit"Igor
09/27/2024, 6:30 PMMarvin
09/27/2024, 6:31 PMprefect concurrency-limit create my_flow_limit 1python
   from prefect import flow, task, concurrency
   @task
   def my_task():
       pass
   @flow
   def my_flow():
       with concurrency("my_flow_limit", 1):
           my_task()
   if __name__ == "__main__":
       my_flow()my_flow"my_flow_limit"prefect concurrency-limit create my_flow_limit 1python
   from prefect import flow, task
   from prefect.concurrency import concurrency
   @task
   def my_task():
       pass
   @flow
   def my_flow():
       with concurrency("my_flow_limit", 1):
           my_task()
   if __name__ == "__main__":
       my_flow()my_flowIgor
09/27/2024, 6:39 PMMarvin
09/27/2024, 6:39 PMClientprefectpython
from prefect import Client
client = Client()
flow_runs = client.graphql(
    """
    query {
        flow_run(where: {flow_id: {_eq: "YOUR_FLOW_ID"}, state: {_eq: "Scheduled"}}) {
            id
            expected_start_time
        }
    }
    """
)
if flow_runs["data"]["flow_run"]:
    print("Flow is scheduled for execution:")
    for flow_run in flow_runs["data"]["flow_run"]:
        print(f"Flow Run ID: {flow_run['id']}, Scheduled for: {flow_run['expected_start_time']}")
else:
    print("No scheduled flow runs found.")"YOUR_FLOW_ID"get_client().read_flow_runspython
import asyncio
from prefect.client import get_client
from prefect.filters import FlowRunFilter
from datetime import datetime, timezone
async def check_scheduled_flows():
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=dict(name=dict(any_=["Scheduled"]))
            )
        )
        if flow_runs:
            print("Flow is scheduled for execution:")
            for flow_run in flow_runs:
                print(f"Flow Run ID: {flow_run.id}, Scheduled for: {flow_run.expected_start_time}")
        else:
            print("No scheduled flow runs found.")
if __name__ == "__main__":
    asyncio.run(check_scheduled_flows())Igor
09/27/2024, 6:48 PMMarvin
09/27/2024, 6:49 PMrun_deploymentrun_deploymentrun_deploymentpython
from prefect.deployments import run_deployment
def main():
    flow_run = run_deployment(name="my_flow_name/my_deployment_name")
    print(f"Flow run created: {flow_run}")
if __name__ == "__main__":
    main()"my_flow_name/my_deployment_name"run_deploymenttimeout=0python
from prefect.deployments import run_deployment
def main():
    flow_run = run_deployment(name="my_flow_name/my_deployment_name")
    print(f"Flow run created: {flow_run}")
if __name__ == "__main__":
    main()"my_flow_name/my_deployment_name"as_subflowpython
from prefect.deployments import run_deployment
@flow
def my_flow():
    flow_run = run_deployment(name="my_flow_name/my_deployment_name", as_subflow=False)
    print(f"Flow run created: {flow_run}")
if __name__ == "__main__":
    my_flow()as_subflow=Falserun_deploymentIgor
09/27/2024, 7:51 PMMarvin
09/27/2024, 7:52 PMservepython
import time
from prefect import flow, serve
@flow
def my_flow(sleep: int = 60):
    """Example flow that sleeps for the provided amount of time (in seconds)."""
    time.sleep(sleep)
if __name__ == "__main__":
    deployment = my_flow.to_deployment(name="my_flow_deployment", interval=60)
    serve(deployment)run_deploymentpython
from prefect.deployments import run_deployment
def main():
    flow_run = run_deployment(name="my_flow_deployment")
    print(f"Flow run created: {flow_run}")
if __name__ == "__main__":
    main()servepython
import time
from prefect import flow, serve
@flow
def my_flow(sleep: int = 60):
    """Example flow that sleeps for the provided amount of time (in seconds)."""
    time.sleep(sleep)
if __name__ == "__main__":
    deployment = my_flow.to_deployment(name="my_flow_deployment", interval=60)
    serve(deployment)run_deploymentpython
from prefect.deployments import run_deployment
def main():
    flow_run = run_deployment(name="my_flow_deployment")
    print(f"Flow run created: {flow_run}")
if __name__ == "__main__":
    main()servesystemdbash
sudo vim /etc/systemd/system/my-prefect-service.service[Unit]
Description=Prefect serve
[Service]
User=prefect
WorkingDirectory=/home/prefect_user
ExecStart=/usr/bin/python3 /path/to/my_serve_script.py
Restart=always
[Install]
WantedBy=multi-user.targetbash
sudo systemctl daemon-reload
sudo systemctl enable my-prefect-service
sudo systemctl start my-prefect-serviceIgor
09/27/2024, 8:18 PMMarvin
09/27/2024, 8:19 PMrun_deploymentservepython
    import time
    from prefect import flow, serve
    @flow
    def my_flow(sleep: int = 60):
        """Example flow that sleeps for the provided amount of time (in seconds)."""
        time.sleep(sleep)
    if __name__ == "__main__":
        deployment = my_flow.to_deployment(name="my_flow_deployment", interval=60)
        serve(deployment)run_deploymentpython
    from prefect.deployments import run_deployment
    def trigger_flow():
        flow_run = run_deployment(name="my_flow_deployment")
        print(f"Flow run created: {flow_run.id}")
    if __name__ == "__main__":
        trigger_flow()servepython
    from prefect import get_client
    async def check_flow_run_status(flow_run_id):
        async with get_client() as client:
            flow_run = await client.read_flow_run(flow_run_id)
            print(f"Flow run status: {flow_run.state}")
    if __name__ == "__main__":
        import asyncio
        flow_run_id = "your_flow_run_id"  # Replace with the actual flow run ID
        asyncio.run(check_flow_run_status(flow_run_id))Igor
09/27/2024, 8:38 PMMarvin
09/27/2024, 8:40 PM.serve.serve.servepython
   import time
   from prefect import flow, serve
   @flow
   def my_flow(sleep: int = 60):
       """Example flow that sleeps for the provided amount of time (in seconds)."""
       time.sleep(sleep)
   if __name__ == "__main__":
       deployment = my_flow.to_deployment(name="my_flow_deployment", interval=60)
       serve(deployment)python
   from prefect.deployments import run_deployment
   def trigger_flow():
       flow_run = run_deployment(name="my_flow_deployment")
       print(f"Flow run created: {flow_run.id}")
   if __name__ == "__main__":
       trigger_flow()python
   import time
   from prefect import flow, serve
   @flow
   def my_flow(sleep: int = 60):
       """Example flow that sleeps for the provided amount of time (in seconds)."""
       time.sleep(sleep)
   if __name__ == "__main__":
       deployment = my_flow.to_deployment(name="my_flow_deployment", interval=60)
       serve(deployment)python
   from prefect.deployments import run_deployment
   def trigger_flow():
       flow_run = run_deployment(name="my_flow_deployment")
       print(f"Flow run created: {flow_run.id}")
   if __name__ == "__main__":
       trigger_flow().serve.serveIgor
09/29/2024, 2:45 PMMarvin
09/29/2024, 2:45 PMrun_deploymentrun_deploymentFlowRunidpython
   from prefect.deployments import run_deployment
   from prefect.client import get_client
   async def trigger_and_get_result():
       flow_run = await run_deployment(name="my_flow_deployment")
       print(f"Flow run created: {flow_run.id}")
       return flow_run.id
   if __name__ == "__main__":
       import asyncio
       flow_run_id = asyncio.run(trigger_and_get_result())python
   from prefect.client import get_client
   from prefect.states import StateType
   async def get_flow_run_result(flow_run_id):
       async with get_client() as client:
           flow_run = await client.read_flow_run(flow_run_id)
           flow_state = flow_run.state
           
           # Check the state type and retrieve the result if completed
           if flow_state.type == StateType.COMPLETED:
               result = flow_state.result()
               print(f"Flow run completed with result: {result}")
           else:
               print(f"Flow run is in state: {flow_state.type}")
   if __name__ == "__main__":
       import asyncio
       flow_run_id = "your_flow_run_id"  # Replace with the actual flow run ID
       asyncio.run(get_flow_run_result(flow_run_id))python
   from prefect.deployments import run_deployment
   from prefect.client import get_client
   async def trigger_and_get_result():
       flow_run = await run_deployment(name="my_flow_deployment")
       print(f"Flow run created: {flow_run.id}")
       return flow_run.id
   if __name__ == "__main__":
       import asyncio
       flow_run_id = asyncio.run(trigger_and_get_result())python
   from prefect.client import get_client
   from prefect.states import StateType
   async def get_flow_run_result(flow_run_id):
       async with get_client() as client:
           flow_run = await client.read_flow_run(flow_run_id)
           flow_state = flow_run.state
           
           # Check the state type and retrieve the result if completed
           if flow_state.type == StateType.COMPLETED:
               result = flow_state.result()
               print(f"Flow run completed with result: {result}")
           else:
               print(f"Flow run is in state: {flow_state.type}")
   if __name__ == "__main__":
       import asyncio
       flow_run_id = "your_flow_run_id"  # Replace with the actual flow run ID
       asyncio.run(get_flow_run_result(flow_run_id))run_deploymentFlowRunididrun_deployment