<@ULVA73B9P> is there a memory provider for BigQue...
# ask-marvin
d
@Marvin is there a memory provider for BigQuery ?
m
thought for 154.2 seconds
Do you mean Marvin’s agent memory providers, or interacting with BigQuery from a Prefect flow? - If you mean Marvin memory: there isn’t a built‑in BigQuery memory provider. Today’s providers include Postgres (SQL), Qdrant, Chroma, and Lance. You could implement a custom provider by subclassing Marvin’s
MemoryProvider
, but there’s no official BigQuery implementation yet. - If you mean Prefect + BigQuery: there’s no “memory provider” concept in Prefect, but Prefect 3.x has a BigQuery integration via prefect-gcp for queries, table creation, and loads. See Prefect GCP integration docs. If you can share which of those you’re after, I can point you to the exact setup or outline how to build a custom provider.
d
@Marvin yes I mean Marvin’s agent MemoryProvider. give me an example of how you would build a BigQuery Memory Provider similar to the Postgres, Qdrant, Chroma and Lance variants.
m
thought for 203.9 seconds
Great — there isn’t a built-in BigQuery provider today, but you can plug in your own by implementing the same duck-typed interface that Marvin’s
Memory
expects: an object with
add(content) -> str
,
search(query, n=20) -> dict[id, content]
, and
delete(memory_id) -> None
. Below is a minimal BigQuery-backed provider that: - stores messages in a BigQuery table (id, key, content, created_at) - performs simple substring search (LIKE) ordered by recency - deletes by id It also auto-creates the dataset/table if they don’t exist. ``` import uuid import datetime as dt from typing import Dict, Optional from google.cloud import bigquery from google.cloud.exceptions import NotFound # A simple BigQuery-backed MemoryProvider class BigQueryMemory: def __init__( self, project: str, dataset: str, table: str = "marvin_memory", *, key: Optional[str] = None, location: Optional[str] = None, client: Optional[bigquery.Client] = None, ): """ project: GCP project id dataset: BigQuery dataset name table: table name (default: marvin_memory) key: logical partition for this memory (e.g., agent or thread key) location: BigQuery location for dataset (e.g., "US") client: optional pre-configured bigquery.Client """ self.project = project self.dataset = dataset self.table = table self.key = key or "default" self.location = location self.client = client or bigquery.Client(project=project, location=location) self.table_ref = f"{self.project}.{self.dataset}.{self.table}" self._ensure_dataset_and_table() def _ensure_dataset_and_table(self): # Ensure dataset exists ds_id = f"{self.project}.{self.dataset}" try: self.client.get_dataset(ds_id) except NotFound: ds = bigquery.Dataset(ds_id) if self.location: ds.location = self.location self.client.create_dataset(ds) # Ensure table exists try: self.client.get_table(self.table_ref) except NotFound: schema = [ bigquery.SchemaField("id", "STRING", mode="REQUIRED"), bigquery.SchemaField("key", "STRING", mode="REQUIRED"), bigquery.SchemaField("content", "STRING", mode="REQUIRED"), bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"), ] table = bigquery.Table(self.table_ref, schema=schema) self.client.create_table(table) def add(self, content: str) -> str: memory_id = str(uuid.uuid4()) rows = [{ "id": memory_id, "key": self.key, "content": content, "created_at": dt.datetime.utcnow().isoformat(), }] errors = self.client.insert_rows_json(self.table_ref, rows) if errors: raise RuntimeError(f"BigQuery insert failed: {errors}") return memory_id def search(self, query: str, n: int = 20) -> Dict[str, str]: # Simple substring search; for vector search, see notes below. sql = f""" SELECT id, content FROM
{self.table_ref}
WHERE key = @key AND content LIKE @pattern ORDER BY created_at DESC LIMIT {int(max(1, n))} """ job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ScalarQueryParameter("key", "STRING", self.key), bigquery.ScalarQueryParameter("pattern", "STRING", f"%{query}%"), ] ) results = self.client.query(sql, job_config=job_config).result() return {row["id"]: row["content"] for row in results} def delete(self, memory_id: str) -> None: sql = f""" DELETE FROM
{self.table_ref}
WHERE key = @key AND id = @id """
job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ScalarQueryParameter("key", "STRING", self.key), bigquery.ScalarQueryParameter("id", "STRING", memory_id), ] ) self.client.query(sql, job_config=job_config).result()
Copy code
Usage with Marvin:
from marvin.memory import Memory provider = BigQueryMemory( project="my-gcp-project", dataset="marvin", table="memory", key="agent:alice", location="US", ) mem = Memory(key="agent:alice", provider=provider) mid = mem.add("Alice likes graphs.") print("added:", mid) print(mem.search("graphs", n=5)) mem.delete(mid) ``` Notes and options: - Authentication: the BigQuery client uses Application Default Credentials (ADC). Set
GOOGLE_APPLICATION_CREDENTIALS
or use workload identity/impersonation as appropriate. - Partitioning/clustering: for scale, consider clustering on
key, created_at
and partitioning by
created_at
(e.g., daily). - Vector search: if you want semantic search, add an
embedding
column and score by cosine similarity. BigQuery now supports vector search; see BigQuery vector search docs. You’d compute embeddings at
add()
time (e.g., via Vertex AI or OpenAI) and then implement
search()
using a vector similarity ORDER BY. For example: - Schema: add
embedding VECTOR<FLOAT64>(1536)
or
ARRAY<FLOAT64>
depending on your setup - Create a vector index if using native vector indexes - In
search()
, bind the query embedding and
ORDER BY VECTOR_COSINE_DISTANCE(embedding, @query_embedding)
ASC LIMIT n References: - Marvin memory API overview: marvin.memory.Memory docs - Example provider (Postgres): Postgres provider docs - BigQuery Python client: google-cloud-bigquery client reference - BigQuery vector search: BigQuery vector search docs If you’d like, I can extend this example with a concrete vector-search implementation (choose your embedding model and desired index type), or add dataset/table partitioning/clustering options.