datamongus
08/28/2025, 5:53 PMMarvin
08/28/2025, 5:53 PMMarvin
08/28/2025, 5:56 PMMemoryProvider
, but there’s no official BigQuery implementation yet.
- If you mean Prefect + BigQuery: there’s no “memory provider” concept in Prefect, but Prefect 3.x has a BigQuery integration via prefect-gcp for queries, table creation, and loads. See Prefect GCP integration docs.
If you can share which of those you’re after, I can point you to the exact setup or outline how to build a custom provider.datamongus
08/28/2025, 5:57 PMMarvin
08/28/2025, 5:57 PMMarvin
08/28/2025, 6:00 PMMemory
expects: an object with add(content) -> str
, search(query, n=20) -> dict[id, content]
, and delete(memory_id) -> None
.
Below is a minimal BigQuery-backed provider that:
- stores messages in a BigQuery table (id, key, content, created_at)
- performs simple substring search (LIKE) ordered by recency
- deletes by id
It also auto-creates the dataset/table if they don’t exist.
```
import uuid
import datetime as dt
from typing import Dict, Optional
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
# A simple BigQuery-backed MemoryProvider
class BigQueryMemory:
def __init__(
self,
project: str,
dataset: str,
table: str = "marvin_memory",
*,
key: Optional[str] = None,
location: Optional[str] = None,
client: Optional[bigquery.Client] = None,
):
"""
project: GCP project id
dataset: BigQuery dataset name
table: table name (default: marvin_memory)
key: logical partition for this memory (e.g., agent or thread key)
location: BigQuery location for dataset (e.g., "US")
client: optional pre-configured bigquery.Client
"""
self.project = project
self.dataset = dataset
self.table = table
self.key = key or "default"
self.location = location
self.client = client or bigquery.Client(project=project, location=location)
self.table_ref = f"{self.project}.{self.dataset}.{self.table}"
self._ensure_dataset_and_table()
def _ensure_dataset_and_table(self):
# Ensure dataset exists
ds_id = f"{self.project}.{self.dataset}"
try:
self.client.get_dataset(ds_id)
except NotFound:
ds = bigquery.Dataset(ds_id)
if self.location:
ds.location = self.location
self.client.create_dataset(ds)
# Ensure table exists
try:
self.client.get_table(self.table_ref)
except NotFound:
schema = [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("key", "STRING", mode="REQUIRED"),
bigquery.SchemaField("content", "STRING", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
]
table = bigquery.Table(self.table_ref, schema=schema)
self.client.create_table(table)
def add(self, content: str) -> str:
memory_id = str(uuid.uuid4())
rows = [{
"id": memory_id,
"key": self.key,
"content": content,
"created_at": dt.datetime.utcnow().isoformat(),
}]
errors = self.client.insert_rows_json(self.table_ref, rows)
if errors:
raise RuntimeError(f"BigQuery insert failed: {errors}")
return memory_id
def search(self, query: str, n: int = 20) -> Dict[str, str]:
# Simple substring search; for vector search, see notes below.
sql = f"""
SELECT id, content
FROM {self.table_ref}
WHERE key = @key AND content LIKE @pattern
ORDER BY created_at DESC
LIMIT {int(max(1, n))}
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("key", "STRING", self.key),
bigquery.ScalarQueryParameter("pattern", "STRING", f"%{query}%"),
]
)
results = self.client.query(sql, job_config=job_config).result()
return {row["id"]: row["content"] for row in results}
def delete(self, memory_id: str) -> None:
sql = f"""
DELETE FROM {self.table_ref}
WHERE key = @key AND id = @id
"""Marvin
08/28/2025, 6:00 PMUsage with Marvin:
from marvin.memory import Memory
provider = BigQueryMemory(
project="my-gcp-project",
dataset="marvin",
table="memory",
key="agent:alice",
location="US",
)
mem = Memory(key="agent:alice", provider=provider)
mid = mem.add("Alice likes graphs.")
print("added:", mid)
print(mem.search("graphs", n=5))
mem.delete(mid)
```
Notes and options:
- Authentication: the BigQuery client uses Application Default Credentials (ADC). Set GOOGLE_APPLICATION_CREDENTIALS
or use workload identity/impersonation as appropriate.
- Partitioning/clustering: for scale, consider clustering on key, created_at
and partitioning by created_at
(e.g., daily).
- Vector search: if you want semantic search, add an embedding
column and score by cosine similarity. BigQuery now supports vector search; see BigQuery vector search docs. You’d compute embeddings at add()
time (e.g., via Vertex AI or OpenAI) and then implement search()
using a vector similarity ORDER BY. For example:
- Schema: add embedding VECTOR<FLOAT64>(1536)
or ARRAY<FLOAT64>
depending on your setup
- Create a vector index if using native vector indexes
- In search()
, bind the query embedding and ORDER BY VECTOR_COSINE_DISTANCE(embedding, @query_embedding)
ASC LIMIT n
References:
- Marvin memory API overview: marvin.memory.Memory docs
- Example provider (Postgres): Postgres provider docs
- BigQuery Python client: google-cloud-bigquery client reference
- BigQuery vector search: BigQuery vector search docs
If you’d like, I can extend this example with a concrete vector-search implementation (choose your embedding model and desired index type), or add dataset/table partitioning/clustering options.