Skip to content

In-Process Library (No Server)

Semvec ships as a pure Python library — you don't need the REST API server or any external service. Construct a SemvecSession directly, bring your own embedder, and drive the full per-turn loop in your application process.

This page covers:

  • When to use in-process vs. REST API
  • SemvecSession — the library facade
  • Bringing your own embedder
  • Running a single turn (run_sync(), await run())
  • Reading results (TurnResult)
  • Lower-level APIs (state updates, retrieval, triggers, context)

When to use in-process

Scenario Use Reason
Tight latency requirement In-process No network hop. State stays in memory.
Single-process Python app In-process Simplest setup. No separate daemon.
Multi-agent in same process In-process Cortex Built-in coordination, no REST.
Polyglot integrations (Node, Go, Rust) REST API Language-agnostic endpoints.
Distributed multi-machine REST API Shared state across deployments.
Serverless / function-as-service In-process Ephemeral state per invocation OK.

For the full decision tree, see Choose your path.


Quick start

1. Install semvec and an embedder

pip install semvec sentence-transformers

2. Construct a session

from semvec import SemvecSession, SemvecState, SemvecConfig
from sentence_transformers import SentenceTransformer
import numpy as np

# Create your embedder
class STEmbedder:
    def __init__(self, name: str = "all-MiniLM-L6-v2"):
        self._model = SentenceTransformer(name)
        self._dim = int(self._model.get_sentence_embedding_dimension() or 384)

    def get_dimension(self) -> int:
        return self._dim

    def get_embedding(self, text: str) -> np.ndarray:
        if not text.strip():
            return np.zeros(self._dim, dtype=np.float64)
        vec = self._model.encode(text, normalize_embeddings=True, convert_to_numpy=True)
        return np.asarray(vec, dtype=np.float64)

# Create session
embedder = STEmbedder(name="all-MiniLM-L6-v2")
config = SemvecConfig(dimension=384)
session = SemvecSession(
    pss_state=SemvecState(config=config),
    embedder=embedder,
    config=config,
)

3. Run one turn (synchronous)

# Synchronous — call from normal Python code
result = session.run_sync("How does the deploy pipeline work?")

print(f"Context: {result.context}")
print(f"Drift phase: {result.drift_phase}")
print(f"Short-circuit: {result.short_circuit}")

4. Run one turn (async)

import asyncio

async def chat():
    # Async — call from async code
    result = await session.run(
        message="How does the deploy pipeline work?",
        response="Here's what I know: [...]",  # optional previous LLM response
    )
    return result

# From sync code:
result = asyncio.run(chat())

SemvecSession API overview

Constructor

from semvec import SemvecSession

session = SemvecSession(
    pss_state: SemvecState,
    embedder: Any,  # your BYOE
    config: SemvecConfig,
    *,
    use_cortex: bool = False,
    chat_proxy: Any = None,
    pending_message: str | None = None,
    owner_subject: str | None = None,
    enable_bm25: bool | None = None,
)

Parameters:

  • pss_state — the underlying semantic state (from SemvecState(config=...))
  • embedder — your embedder, matching EmbedderProtocol (below)
  • config — the SemvecConfig instance
  • use_cortex — enable cross-session Cortex aggregation (multi-embedder blend); default False
  • enable_bm25 — enable BM25 hybrid retrieval (lexical boost); default auto from env
  • owner_subject — optional subject ID for provenance tracking (e.g., in compliance pack)

The turn loop: run() and run_sync()

Every parameter except message is keyword-only (the * below):

# Async signature
async def run(
    self,
    message: str,
    *,
    response: str | None = None,  # previous LLM response (optional)
    top_k: int = 5,
    short_circuit_threshold: float = 0.85,
    mmr_fetch_k: int = 0,
    mmr_lambda: float = 0.5,
    bm25_fetch_k: int = 50,
    reranker: Callable[[str, list], list] | None = None,
) -> TurnResult

# Sync wrapper — same signature; cannot be called from within a running event loop
def run_sync(self, message: str, *, response=None, ...) -> TurnResult

# Usage
result = await session.run("the user's message", response="the previous answer")
result = session.run_sync("the user's message")

Both methods perform the same per-turn orchestration:

  1. Embed the new message and optional response in parallel
  2. If response is given, store it immediately (with its precomputed embedding)
  3. Retrieve the top relevant memories (with optional BM25 fusion and reranking)
  4. Compute short-circuit (is this query identical to a stored memory?)
  5. Compute drift (how far has semantic context shifted?)
  6. Buffer the new message for the next turn
  7. Render a context block (retrieval-based summary)
  8. Return a TurnResult

Parameters:

  • message — the new user input
  • response — the LLM's previous output (optional; stored if provided)
  • top_k — how many memories to retrieve
  • short_circuit_threshold — cosine cutoff for "this is a duplicate query"
  • mmr_fetch_k — when >0, fetch this many candidates and apply MMR (diversity reranking)
  • mmr_lambda — MMR balance: 1.0 = pure diversity, 0.0 = pure relevance
  • bm25_fetch_k — how many BM25 (lexical) hits to fuse with dense results
  • reranker — optional cross-encoder reranker: (query_text, candidates) -> top_k_reranked

Read the result: TurnResult

from semvec import TurnResult

result: TurnResult = session.run_sync(...)

# Named tuple fields:
print(result.top_similarity)      # float: cosine of query vs. top memory
print(result.short_circuit)       # bool: is this a duplicate?
print(result.drift_score)         # float: 0.0–1.0 drift magnitude
print(result.drift_detected)      # bool: drift_score >= 0.5?
print(result.drift_phase)         # str: "stable" | "shifting" | "drifted"
print(result.context)             # str: retrieval-based summary for LLM
print(result.dedup_signal)        # dict | None: update de-duplication hints
print(result.retrieval_error)     # bool: True if memory retrieval faulted this turn

Bringing your own embedder

Your embedder must implement EmbedderProtocol:

class EmbedderProtocol(Protocol):
    """Structural type for a Bring-Your-Own-Embedder."""

    def get_dimension(self) -> int: ...

    def get_embedding(self, text: str) -> np.ndarray: ...

Rules:

  • Return normalized unit vectors (norm = 1.0) — if not, normalize in your wrapper
  • Return np.ndarray with dtype=np.float64
  • Return a zero-norm vector (np.zeros(dim)) for empty/whitespace input, not an error
  • The dimension must match your SemvecConfig(dimension=...)

Example: SentenceTransformers

from sentence_transformers import SentenceTransformer
import numpy as np

class STEmbedder:
    def __init__(self, name: str = "all-MiniLM-L6-v2"):
        self._model = SentenceTransformer(name)
        self._dim = int(self._model.get_sentence_embedding_dimension() or 384)

    def get_dimension(self) -> int:
        return self._dim

    def get_embedding(self, text: str) -> np.ndarray:
        if not text.strip():
            return np.zeros(self._dim, dtype=np.float64)
        vec = self._model.encode(
            text,
            normalize_embeddings=True,
            convert_to_numpy=True,
            show_progress_bar=False,
        )
        return np.asarray(vec, dtype=np.float64)

Example: OpenAI

from openai import OpenAI
import numpy as np

class OpenAIEmbedder:
    def __init__(self, model: str = "text-embedding-3-small"):
        self._client = OpenAI()
        self._model = model
        self._dim = 1536 if "small" in model else 3072

    def get_dimension(self) -> int:
        return self._dim

    def get_embedding(self, text: str) -> np.ndarray:
        if not text.strip():
            return np.zeros(self._dim, dtype=np.float64)
        response = self._client.embeddings.create(model=self._model, input=text)
        vec = np.asarray(response.data[0].embedding, dtype=np.float64)
        # OpenAI returns unnormalized; normalize:
        norm = np.linalg.norm(vec)
        return vec / norm if norm > 1e-8 else vec

For more examples and tradeoffs, see Choosing an Embedder.


Lower-level APIs

Once you have a SemvecSession, you can drive the full API directly without run().

Store a Q&A pair (without running the full turn loop)

# Store a single chunk (e.g., a RAG context block the LLM didn't generate)
result = session.store_qa(
    response="The deploy pipeline uses GitHub Actions for CI/CD.",
)

# Or async:
result = await session.store_qa_async(response="...")

Compute short-circuit (duplicate detection)

import numpy as np

query_embedding = embedder.get_embedding("How does deploy work?")
top_similarity, is_duplicate = session.compute_short_circuit(
    "How does deploy work?",
    threshold=0.85,
    query_embedding=query_embedding,
)

if is_duplicate:
    print("This query is too similar to a stored memory")

Compute drift (semantic divergence)

query_embedding = embedder.get_embedding("What's the new framework?")
drift_score, drift_detected, drift_phase = session.compute_drift(
    "What's the new framework?",
    query_embedding=query_embedding,
)

# drift_phase is one of: "stable", "shifting", "drifted"
print(f"Drift phase: {drift_phase}")

Retrieve top memories manually

query_embedding = embedder.get_embedding("What do we know about X?")
top_k = 5

memories = session.state.memory.get_relevant_memories(query_embedding, top_k=top_k)
for mem in memories:
    print(f"  [{mem.importance:.3f}] {mem.text[:100]}")

Get the context block (for injecting into LLM prompt)

context = session.context_block(
    query_text="What's the deploy pipeline?",
    top_k=5,
)

# Paste `context` into your LLM system prompt:
system_prompt = f"""You are a helpful assistant.

## What we remember:
{context}

Answer the user's question based on the above context."""

Phase C operations: Triggers, Anchors, Isolation

Add resonance triggers (boost specific topics)

# Keyword trigger: boost memories about "bug" in retrieval
trigger_id = session.add_trigger(keyword="bug", threshold=0.8)

# Embedding trigger: boost memories semantically similar to this vector
trigger_embedding = embedder.get_embedding("database failures")
trigger_id = session.add_trigger(embedding=list(trigger_embedding), threshold=0.8)

# Clear all triggers
session.clear_triggers()

Drift anchors (realign semantic context)

# Add an anchor vector: the session will try to realign toward it
anchor_embedding = embedder.get_embedding("We're debugging production issues")
anchor_id = session.add_anchor(list(anchor_embedding))

# Query anchor drift:
scores = session.get_anchor_score()
print(f"Anchor score: {scores['anchor_score']}")
print(f"Remaining realignment: {scores['realignment_remaining']}")

Input isolation (block off-topic queries)

# Isolation levels: "open" (off), "filter" (drop matching input),
# "quarantine" (hold for review), "lockdown" (block all updates).

# Filter out queries similar to a topic
exclusion_embedding = embedder.get_embedding("social media drama")
session.set_isolation(
    level="filter",
    exclusion_embeddings=[list(exclusion_embedding)],
    similarity_threshold=0.7,
)

# Quarantine queries that fall outside an allowed domain
allowlist_embedding = embedder.get_embedding("software development")
session.set_isolation(
    level="quarantine",
    allowlist_embeddings=[list(allowlist_embedding)],
)

# Release quarantine (if isolation blocked a message)
session.release_quarantine()

Inject synthetic memories

# Manually add a memory (e.g., from an external knowledge base)
embedding = embedder.get_embedding("Deployment uses Terraform for IaC")
memory_count = session.inject_memory(
    embedding=list(embedding),
    text="Deployment uses Terraform for IaC",
    tier="long_term",
    importance=0.8,
)

Persistence: Export and import state

Export

export_dict = session.export_state()

# Contains:
# - state_dict: the full semantic state, memory tiers, history
# - checksum: SHA256 over the semantic vector for tampering detection

import json
with open("session_backup.json", "w") as f:
    json.dump(export_dict, f)

Import

import json

with open("session_backup.json", "r") as f:
    export_dict = json.load(f)

# Restore into a fresh session
session.import_state(export_dict["state_dict"])

Literal cache (verbatim code facts)

For coding agents and compliance workloads, store exact values (variable names, file paths, error messages) that shouldn't be embedded/lossy:

# Store an entity
session.store_entity(
    key="deploy_script_path",
    kind="path",
    value="/opt/app/scripts/deploy.sh",
    context="Used in the production deploy pipeline",
    importance=1.0,
)

# Query by text
entities = session.query_entities(query_text="path", max_results=10)
for e in entities:
    print(f"  [{e['kind']}] {e['key']} = {e['value']}")

# Query all
all_entities = session.query_entities(max_results=100)

# Remove
session.remove_entity(key="deploy_script_path")

Metrics and diagnostics

metrics = session.get_metrics()

print(f"Phase: {metrics['phase']}")
print(f"Interactions: {metrics['interaction_count']}")
print(f"Total memories: {metrics['total_memories']}")
print(f"Beta history: {metrics['beta_history']}")
print(f"Phase history: {metrics['phase_history']}")

Complete example: in-process coding assistant

import asyncio
from semvec import SemvecSession, SemvecState, SemvecConfig
from sentence_transformers import SentenceTransformer
import numpy as np

class STEmbedder:
    def __init__(self):
        self._model = SentenceTransformer("all-MiniLM-L6-v2")
        self._dim = 384

    def get_dimension(self) -> int:
        return self._dim

    def get_embedding(self, text: str) -> np.ndarray:
        if not text.strip():
            return np.zeros(self._dim, dtype=np.float64)
        vec = self._model.encode(text, normalize_embeddings=True, convert_to_numpy=True)
        return np.asarray(vec, dtype=np.float64)

# Setup
embedder = STEmbedder()
config = SemvecConfig(dimension=384)
session = SemvecSession(
    pss_state=SemvecState(config=config),
    embedder=embedder,
    config=config,
)

# Simulate a multi-turn conversation
turns = [
    ("Can you explain the authentication flow?", "Here's the OAuth2 flow: ..."),
    ("What rate limits do we have?", "API rate limits are 1000 req/min per token."),
    ("How do we handle token refresh?", "The client library auto-refreshes 5 min before expiry."),
    ("What if the backend is down during refresh?", "We have a 30-second exponential backoff retry."),
]

async def multi_turn_chat():
    for i, (user_msg, llm_response) in enumerate(turns):
        print(f"\n--- Turn {i+1} ---")
        result = await session.run(
            message=user_msg,
            response=llm_response,
            top_k=3,
        )
        print(f"User: {user_msg}")
        print(f"Drift phase: {result.drift_phase}")
        print(f"Retrieved context:\n{result.context}\n")

# Run
asyncio.run(multi_turn_chat())

See also