In-Process Library (No Server)¶
Semvec ships as a pure Python library — you don't need the REST API server or any external service. Construct a SemvecSession directly, bring your own embedder, and drive the full per-turn loop in your application process.
This page covers:
- When to use in-process vs. REST API
- SemvecSession — the library facade
- Bringing your own embedder
- Running a single turn (
run_sync(),await run()) - Reading results (
TurnResult) - Lower-level APIs (state updates, retrieval, triggers, context)
When to use in-process¶
| Scenario | Use | Reason |
|---|---|---|
| Tight latency requirement | In-process | No network hop. State stays in memory. |
| Single-process Python app | In-process | Simplest setup. No separate daemon. |
| Multi-agent in same process | In-process Cortex | Built-in coordination, no REST. |
| Polyglot integrations (Node, Go, Rust) | REST API | Language-agnostic endpoints. |
| Distributed multi-machine | REST API | Shared state across deployments. |
| Serverless / function-as-service | In-process | Ephemeral state per invocation OK. |
For the full decision tree, see Choose your path.
Quick start¶
1. Install semvec and an embedder¶
2. Construct a session¶
from semvec import SemvecSession, SemvecState, SemvecConfig
from sentence_transformers import SentenceTransformer
import numpy as np
# Create your embedder
class STEmbedder:
def __init__(self, name: str = "all-MiniLM-L6-v2"):
self._model = SentenceTransformer(name)
self._dim = int(self._model.get_sentence_embedding_dimension() or 384)
def get_dimension(self) -> int:
return self._dim
def get_embedding(self, text: str) -> np.ndarray:
if not text.strip():
return np.zeros(self._dim, dtype=np.float64)
vec = self._model.encode(text, normalize_embeddings=True, convert_to_numpy=True)
return np.asarray(vec, dtype=np.float64)
# Create session
embedder = STEmbedder(name="all-MiniLM-L6-v2")
config = SemvecConfig(dimension=384)
session = SemvecSession(
pss_state=SemvecState(config=config),
embedder=embedder,
config=config,
)
3. Run one turn (synchronous)¶
# Synchronous — call from normal Python code
result = session.run_sync("How does the deploy pipeline work?")
print(f"Context: {result.context}")
print(f"Drift phase: {result.drift_phase}")
print(f"Short-circuit: {result.short_circuit}")
4. Run one turn (async)¶
import asyncio
async def chat():
# Async — call from async code
result = await session.run(
message="How does the deploy pipeline work?",
response="Here's what I know: [...]", # optional previous LLM response
)
return result
# From sync code:
result = asyncio.run(chat())
SemvecSession API overview¶
Constructor¶
from semvec import SemvecSession
session = SemvecSession(
pss_state: SemvecState,
embedder: Any, # your BYOE
config: SemvecConfig,
*,
use_cortex: bool = False,
chat_proxy: Any = None,
pending_message: str | None = None,
owner_subject: str | None = None,
enable_bm25: bool | None = None,
)
Parameters:
pss_state— the underlying semantic state (fromSemvecState(config=...))embedder— your embedder, matchingEmbedderProtocol(below)config— theSemvecConfiginstanceuse_cortex— enable cross-session Cortex aggregation (multi-embedder blend); default Falseenable_bm25— enable BM25 hybrid retrieval (lexical boost); default auto from envowner_subject— optional subject ID for provenance tracking (e.g., in compliance pack)
The turn loop: run() and run_sync()¶
Every parameter except message is keyword-only (the * below):
# Async signature
async def run(
self,
message: str,
*,
response: str | None = None, # previous LLM response (optional)
top_k: int = 5,
short_circuit_threshold: float = 0.85,
mmr_fetch_k: int = 0,
mmr_lambda: float = 0.5,
bm25_fetch_k: int = 50,
reranker: Callable[[str, list], list] | None = None,
) -> TurnResult
# Sync wrapper — same signature; cannot be called from within a running event loop
def run_sync(self, message: str, *, response=None, ...) -> TurnResult
# Usage
result = await session.run("the user's message", response="the previous answer")
result = session.run_sync("the user's message")
Both methods perform the same per-turn orchestration:
- Embed the new
messageand optionalresponsein parallel - If
responseis given, store it immediately (with its precomputed embedding) - Retrieve the top relevant memories (with optional BM25 fusion and reranking)
- Compute short-circuit (is this query identical to a stored memory?)
- Compute drift (how far has semantic context shifted?)
- Buffer the new message for the next turn
- Render a context block (retrieval-based summary)
- Return a
TurnResult
Parameters:
message— the new user inputresponse— the LLM's previous output (optional; stored if provided)top_k— how many memories to retrieveshort_circuit_threshold— cosine cutoff for "this is a duplicate query"mmr_fetch_k— when >0, fetch this many candidates and apply MMR (diversity reranking)mmr_lambda— MMR balance: 1.0 = pure diversity, 0.0 = pure relevancebm25_fetch_k— how many BM25 (lexical) hits to fuse with dense resultsreranker— optional cross-encoder reranker:(query_text, candidates) -> top_k_reranked
Read the result: TurnResult¶
from semvec import TurnResult
result: TurnResult = session.run_sync(...)
# Named tuple fields:
print(result.top_similarity) # float: cosine of query vs. top memory
print(result.short_circuit) # bool: is this a duplicate?
print(result.drift_score) # float: 0.0–1.0 drift magnitude
print(result.drift_detected) # bool: drift_score >= 0.5?
print(result.drift_phase) # str: "stable" | "shifting" | "drifted"
print(result.context) # str: retrieval-based summary for LLM
print(result.dedup_signal) # dict | None: update de-duplication hints
print(result.retrieval_error) # bool: True if memory retrieval faulted this turn
Bringing your own embedder¶
Your embedder must implement EmbedderProtocol:
class EmbedderProtocol(Protocol):
"""Structural type for a Bring-Your-Own-Embedder."""
def get_dimension(self) -> int: ...
def get_embedding(self, text: str) -> np.ndarray: ...
Rules:
- Return normalized unit vectors (norm = 1.0) — if not, normalize in your wrapper
- Return
np.ndarraywithdtype=np.float64 - Return a zero-norm vector (
np.zeros(dim)) for empty/whitespace input, not an error - The dimension must match your
SemvecConfig(dimension=...)
Example: SentenceTransformers¶
from sentence_transformers import SentenceTransformer
import numpy as np
class STEmbedder:
def __init__(self, name: str = "all-MiniLM-L6-v2"):
self._model = SentenceTransformer(name)
self._dim = int(self._model.get_sentence_embedding_dimension() or 384)
def get_dimension(self) -> int:
return self._dim
def get_embedding(self, text: str) -> np.ndarray:
if not text.strip():
return np.zeros(self._dim, dtype=np.float64)
vec = self._model.encode(
text,
normalize_embeddings=True,
convert_to_numpy=True,
show_progress_bar=False,
)
return np.asarray(vec, dtype=np.float64)
Example: OpenAI¶
from openai import OpenAI
import numpy as np
class OpenAIEmbedder:
def __init__(self, model: str = "text-embedding-3-small"):
self._client = OpenAI()
self._model = model
self._dim = 1536 if "small" in model else 3072
def get_dimension(self) -> int:
return self._dim
def get_embedding(self, text: str) -> np.ndarray:
if not text.strip():
return np.zeros(self._dim, dtype=np.float64)
response = self._client.embeddings.create(model=self._model, input=text)
vec = np.asarray(response.data[0].embedding, dtype=np.float64)
# OpenAI returns unnormalized; normalize:
norm = np.linalg.norm(vec)
return vec / norm if norm > 1e-8 else vec
For more examples and tradeoffs, see Choosing an Embedder.
Lower-level APIs¶
Once you have a SemvecSession, you can drive the full API directly without run().
Store a Q&A pair (without running the full turn loop)¶
# Store a single chunk (e.g., a RAG context block the LLM didn't generate)
result = session.store_qa(
response="The deploy pipeline uses GitHub Actions for CI/CD.",
)
# Or async:
result = await session.store_qa_async(response="...")
Compute short-circuit (duplicate detection)¶
import numpy as np
query_embedding = embedder.get_embedding("How does deploy work?")
top_similarity, is_duplicate = session.compute_short_circuit(
"How does deploy work?",
threshold=0.85,
query_embedding=query_embedding,
)
if is_duplicate:
print("This query is too similar to a stored memory")
Compute drift (semantic divergence)¶
query_embedding = embedder.get_embedding("What's the new framework?")
drift_score, drift_detected, drift_phase = session.compute_drift(
"What's the new framework?",
query_embedding=query_embedding,
)
# drift_phase is one of: "stable", "shifting", "drifted"
print(f"Drift phase: {drift_phase}")
Retrieve top memories manually¶
query_embedding = embedder.get_embedding("What do we know about X?")
top_k = 5
memories = session.state.memory.get_relevant_memories(query_embedding, top_k=top_k)
for mem in memories:
print(f" [{mem.importance:.3f}] {mem.text[:100]}")
Get the context block (for injecting into LLM prompt)¶
context = session.context_block(
query_text="What's the deploy pipeline?",
top_k=5,
)
# Paste `context` into your LLM system prompt:
system_prompt = f"""You are a helpful assistant.
## What we remember:
{context}
Answer the user's question based on the above context."""
Phase C operations: Triggers, Anchors, Isolation¶
Add resonance triggers (boost specific topics)¶
# Keyword trigger: boost memories about "bug" in retrieval
trigger_id = session.add_trigger(keyword="bug", threshold=0.8)
# Embedding trigger: boost memories semantically similar to this vector
trigger_embedding = embedder.get_embedding("database failures")
trigger_id = session.add_trigger(embedding=list(trigger_embedding), threshold=0.8)
# Clear all triggers
session.clear_triggers()
Drift anchors (realign semantic context)¶
# Add an anchor vector: the session will try to realign toward it
anchor_embedding = embedder.get_embedding("We're debugging production issues")
anchor_id = session.add_anchor(list(anchor_embedding))
# Query anchor drift:
scores = session.get_anchor_score()
print(f"Anchor score: {scores['anchor_score']}")
print(f"Remaining realignment: {scores['realignment_remaining']}")
Input isolation (block off-topic queries)¶
# Isolation levels: "open" (off), "filter" (drop matching input),
# "quarantine" (hold for review), "lockdown" (block all updates).
# Filter out queries similar to a topic
exclusion_embedding = embedder.get_embedding("social media drama")
session.set_isolation(
level="filter",
exclusion_embeddings=[list(exclusion_embedding)],
similarity_threshold=0.7,
)
# Quarantine queries that fall outside an allowed domain
allowlist_embedding = embedder.get_embedding("software development")
session.set_isolation(
level="quarantine",
allowlist_embeddings=[list(allowlist_embedding)],
)
# Release quarantine (if isolation blocked a message)
session.release_quarantine()
Inject synthetic memories¶
# Manually add a memory (e.g., from an external knowledge base)
embedding = embedder.get_embedding("Deployment uses Terraform for IaC")
memory_count = session.inject_memory(
embedding=list(embedding),
text="Deployment uses Terraform for IaC",
tier="long_term",
importance=0.8,
)
Persistence: Export and import state¶
Export¶
export_dict = session.export_state()
# Contains:
# - state_dict: the full semantic state, memory tiers, history
# - checksum: SHA256 over the semantic vector for tampering detection
import json
with open("session_backup.json", "w") as f:
json.dump(export_dict, f)
Import¶
import json
with open("session_backup.json", "r") as f:
export_dict = json.load(f)
# Restore into a fresh session
session.import_state(export_dict["state_dict"])
Literal cache (verbatim code facts)¶
For coding agents and compliance workloads, store exact values (variable names, file paths, error messages) that shouldn't be embedded/lossy:
# Store an entity
session.store_entity(
key="deploy_script_path",
kind="path",
value="/opt/app/scripts/deploy.sh",
context="Used in the production deploy pipeline",
importance=1.0,
)
# Query by text
entities = session.query_entities(query_text="path", max_results=10)
for e in entities:
print(f" [{e['kind']}] {e['key']} = {e['value']}")
# Query all
all_entities = session.query_entities(max_results=100)
# Remove
session.remove_entity(key="deploy_script_path")
Metrics and diagnostics¶
metrics = session.get_metrics()
print(f"Phase: {metrics['phase']}")
print(f"Interactions: {metrics['interaction_count']}")
print(f"Total memories: {metrics['total_memories']}")
print(f"Beta history: {metrics['beta_history']}")
print(f"Phase history: {metrics['phase_history']}")
Complete example: in-process coding assistant¶
import asyncio
from semvec import SemvecSession, SemvecState, SemvecConfig
from sentence_transformers import SentenceTransformer
import numpy as np
class STEmbedder:
def __init__(self):
self._model = SentenceTransformer("all-MiniLM-L6-v2")
self._dim = 384
def get_dimension(self) -> int:
return self._dim
def get_embedding(self, text: str) -> np.ndarray:
if not text.strip():
return np.zeros(self._dim, dtype=np.float64)
vec = self._model.encode(text, normalize_embeddings=True, convert_to_numpy=True)
return np.asarray(vec, dtype=np.float64)
# Setup
embedder = STEmbedder()
config = SemvecConfig(dimension=384)
session = SemvecSession(
pss_state=SemvecState(config=config),
embedder=embedder,
config=config,
)
# Simulate a multi-turn conversation
turns = [
("Can you explain the authentication flow?", "Here's the OAuth2 flow: ..."),
("What rate limits do we have?", "API rate limits are 1000 req/min per token."),
("How do we handle token refresh?", "The client library auto-refreshes 5 min before expiry."),
("What if the backend is down during refresh?", "We have a 30-second exponential backoff retry."),
]
async def multi_turn_chat():
for i, (user_msg, llm_response) in enumerate(turns):
print(f"\n--- Turn {i+1} ---")
result = await session.run(
message=user_msg,
response=llm_response,
top_k=3,
)
print(f"User: {user_msg}")
print(f"Drift phase: {result.drift_phase}")
print(f"Retrieved context:\n{result.context}\n")
# Run
asyncio.run(multi_turn_chat())
See also¶
- Choosing an Embedder — SentenceTransformers, OpenAI, ONNX, custom
- Correcting Memories — triggers, anchors, negative attractors
- Compliance Pack — HMAC, event store, deletion certificates
- API Reference: Core (SemvecState) — low-level state API