oleander-whirlwind
10/06/2025, 9:31 AMMarvin
10/06/2025, 9:31 AMMarvin
10/06/2025, 9:34 AMcache_result_in_memory=Truepersist_result=Falsepersist_result=Truefrom prefect import flow, task
@task
def predict_batch(batch):
    import torch
    model = ...  # build/load once here
    model.eval()
    with torch.no_grad():
        return model(batch).cpu().numpy()
@flow
def main_flow(batches):
    return [predict_batch.submit(b) for b in batches]from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task(persist_result=False)
def load_model():
    import torch
    model = ...
    model.eval()
    return model  # stays in memory only
@task
def predict(model, batch):
    import torch
    with torch.no_grad():
        return model(batch).cpu().numpy()
@flow(task_runner=ThreadPoolTaskRunner())
def main_flow(batch):
    model = load_model()
    preds = predict(model, batch)
    return predsfrom functools import lru_cache
from prefect import flow, task
@lru_cache(maxsize=1)
def get_model():
    import torch
    m = ...
    m.eval()
    return m
@task
def predict(batch):
    import torch
    model = get_model()
    with torch.no_grad():
        return model(batch).cpu().numpy()
@flow
def main_flow(batches):
    return [predict.submit(b) for b in batches]torch.loadmap_location='cpu'persist_result=TrueFalseMarvin
10/06/2025, 9:34 AMoleander-whirlwind
10/06/2025, 9:40 AMoleander-whirlwind
10/06/2025, 9:41 AMMarvin
10/06/2025, 9:41 AMMarvin
10/06/2025, 9:45 AMfrom pathlib import Path
from functools import lru_cache
from prefect import flow, task
from prefect.context import get_run_context
from prefect.task_runners import ProcessPoolTaskRunner
# Simple torch model for demo
def build_model():
    import torch
    import torch.nn as nn
    model = nn.Sequential(nn.Linear(4, 8), nn.ReLU(), nn.Linear(8, 1))
    model.eval()
    return model
@task
def load_model_to_disk() -> str:
    import torch
    ctx = get_run_context()
    run_id = ctx.flow_run.id
    # Use a run-scoped path so concurrent runs don't collide
    out_dir = Path("/tmp") / f"prefect-model-{run_id}"
    out_dir.mkdir(parents=True, exist_ok=True)
    path = out_dir / "<http://model.pt|model.pt>"
    model = build_model()
    # Option 1 (simple, requires this code to be importable where you load):
    torch.save(model, path)  # pickles the full model object
    # Option 2 (portable): torch.jit.trace/script and save a TorchScript file
    # example:
    # example_input = torch.randn(1, 4)
    # ts_model = torch.jit.trace(model, example_input)
    # ts_path = out_dir / "model.ts"
    # ts_model.save(str(ts_path))
    # return str(ts_path)
    return str(path)
@task
def predict_from_disk(model_path: str, batch: list[float]) -> float:
    import torch
    import numpy as np
    # If you saved with torch.save(model, ...):
    model = torch.load(model_path, map_location="cpu")
    # If you saved a TorchScript file instead:
    # model = torch.jit.load(model_path, map_location="cpu")
    x = torch.tensor(np.array(batch, dtype=np.float32)).view(1, -1)
    with torch.no_grad():
        y = model(x).item()
    return y
@flow(task_runner=ProcessPoolTaskRunner())
def multiprocess_flow(batch: list[float]) -> float:
    model_path = load_model_to_disk()
    return predict_from_disk(model_path, batch)
if __name__ == "__main__":
    print(multiprocess_flow([1.0, 2.0, 3.0, 4.0]))s3://...prefect[dask]boto3Marvin
10/06/2025, 9:45 AMoleander-whirlwind
10/06/2025, 9:49 AMoleander-whirlwind
10/06/2025, 9:52 AMoleander-whirlwind
10/06/2025, 9:57 AMMarvin
10/06/2025, 9:57 AMMarvin
10/06/2025, 9:58 AMaddress="tcp://<scheduler-host>:8786"prefect[dask]boto3S3_BUCKETS3_PREFIXprefect[dask]Marvin
10/06/2025, 9:58 AMoleander-whirlwind
10/06/2025, 10:09 AMMarvin
10/06/2025, 10:09 AMMarvin
10/06/2025, 10:10 AMMarvin
10/06/2025, 10:10 AMoleander-whirlwind
10/06/2025, 10:45 AMMarvin
10/06/2025, 10:45 AMMarvin
10/06/2025, 10:46 AMoleander-whirlwind
10/06/2025, 11:10 AMMarvin
10/06/2025, 11:10 AMMarvin
10/06/2025, 11:11 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def run_container(image: str, command: list[str]) -> dict:
    import docker
    client = docker.from_env()  # per-thread client
    container = client.containers.run(image, command, detach=True)
    try:
        result = container.wait()            # {'StatusCode': int}
        logs = container.logs().decode()
        return {"id": container.id, "status": result["StatusCode"], "logs": logs}
    finally:
        # cleanup
        try:
            container.remove(force=True)
        except Exception:
            pass
@task
def aggregate(results: list[dict]) -> dict:
    # All results are just Python objects; shared in the same process
    return {
        "count": len(results),
        "exit_codes": [r["status"] for r in results],
        "first_log": results[0]["logs"][:200] if results else "",
    }
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def docker_threaded_flow():
    futures = [
        run_container.submit("alpine:3.19", ["sh", "-c", "echo hi && sleep 1 && echo bye"]),
        run_container.submit("alpine:3.19", ["sh", "-c", "echo task2 && sleep 2"]),
        run_container.submit("alpine:3.19", ["sh", "-c", "echo task3 && sleep 3"]),
        run_container.submit("alpine:3.19", ["sh", "-c", "echo task4 && sleep 1"]),
    ]
    results = [f.result() for f in futures]
    return aggregate(results)
if __name__ == "__main__":
    print(docker_threaded_flow())