Skip to content

KV store (ctx.kv / bridge.kv)

A KV is a generic key-value store scoped to your app and one tenant. Use it for state that needs to survive container restarts but doesn’t warrant its own table:

  • Settings the user toggles inside your app’s UI
  • Activity logs (activity:<id>) with a TTL so they self-clean
  • Cursors / last-seen markers across schedule fires
  • Per-user lightweight state (user/<id>/preferences)

The store is strictly app-private: another partner app on the same tenant cannot read or write your keys. Tenants are isolated as everywhere else.

NeedUse
User-tunable settings the platform should rendermanifest install_settings
Domain data with relations + queries (clients, documents)dedicated skill + table
Static config baked at build timeenv vars
App-private state that needs persistencectx.kv / bridge.kv

If you find yourself wanting joins, filters, or another app’s read access — that’s a sign you want a dedicated skill+table, not KV.

  • Value: 64 KB hard cap (JSON-serialized). Bigger blobs go in ctx.files.
  • Key: 1-256 chars, printable ASCII (no whitespace, no control chars). Conventions like settings, activity:<id>, user/<id>/foo work fine.
  • Soft cap: 10_000 keys per (tenant, app). Set returns an error with hint when hit. Delete unused keys before adding more.
  • TTL: optional [60, 31_536_000] seconds (1 minute to 1 year). Reads filter expired rows; cleanup is a separate periodic job.
from linkworld_sdk import App
app = App.from_manifest(__file__)
@app.on_schedule("inbox_sweep")
async def sweep(ctx):
# Read settings the user configured in the app's UI
settings = await ctx.kv.get("settings") or {}
cadence = settings.get("cadence_minutes", 5)
# Persist a cursor so the next fire knows where to resume
last = await ctx.kv.get("last_swept_at")
...
await ctx.kv.set("last_swept_at", datetime.now().isoformat())
# Activity row with auto-expiry — tagged for the UI
await ctx.kv.set(
f"activity:{msg_id}",
{"state": "processing", "subject": subj, "started_at": ...},
ttl_seconds=7 * 86400, # auto-clean after 7 days
)
@app.http_route("/activity", methods=["GET"])
async def activity(ctx, request):
"""Backend route the iframe UI calls to render the activity feed."""
items = await ctx.kv.list(prefix="activity:", limit=100)
return {"items": items}
MethodReturns
await ctx.kv.get(key)The stored JSON value, or None if missing/expired
await ctx.kv.set(key, value, *, ttl_seconds=None)Upserts. Raises ToolCallError on size cap or soft cap
await ctx.kv.list(prefix="", *, limit=100, include_values=True)List of {key, value?, expires_at, updated_at}, ordered by key, expired rows excluded
await ctx.kv.delete(key)True if a row was removed, False if it didn’t exist

get returns the value directly (not a wrapper). If you need to distinguish “missing” from “value is None”, use list with a prefix and check membership.

Same surface, async/await, runs in your iframe bundle:

import { LinkworldBridge } from '@linkworld_ai/sdk-browser'
const bridge = new LinkworldBridge()
// Settings tab — load + save
const settings = await bridge.kv.get('settings') ?? {}
await bridge.kv.set('settings', { cadence_minutes: 10, watched_folders: [...] })
// Activity feed — poll every few seconds
const items = await bridge.kv.list({ prefix: 'activity:', limit: 50 })
for (const { key, value } of items) { ... }
// Manually drop a stale entry
await bridge.kv.delete('activity:msg-abc123')

The browser bridge routes through the same scope + audit pipeline as server-side calls (via the existing tools.call postMessage infrastructure). The iframe origin gate means another tab/extension cannot call bridge.kv against your app’s namespace.

interface KvApi {
get(key: string): Promise<unknown>
set(key: string, value: unknown, options?: { ttlSeconds?: number }): Promise<void>
list(options?: {
prefix?: string
limit?: number // default 100, max 1000
includeValues?: boolean // default true; false omits 'value' for cheaper lists
}): Promise<KvListItem[]>
delete(key: string): Promise<boolean>
}
interface KvListItem {
key: string
value?: unknown // present when includeValues != false
expires_at: string | null
updated_at: string | null
}

KV rows live in the platform-managed table linkworld_app_kv (migration 282), keyed by (tenant_id, app_id, key). The skill stamps app_id from the platform-set contextvar — your app cannot spoof another’s namespace. RLS allows only the service-role; the skill enforces app-scoping in code.

ColumnNotes
tenant_id UUIDTenant the row belongs to
app_id TEXTYour app slug — set automatically by the platform
key TEXT1-256 printable-ASCII chars
value JSONBAnything JSON-serializable, ≤ 64 KB
expires_at TIMESTAMPTZOptional TTL
updated_at TIMESTAMPTZAuto-touched by the trigger

All methods raise ToolCallError (server-side) / reject with the same shape (browser) on:

  • size_exceeded — value > 64 KB serialized
  • cap_reached — 10k keys/app soft-cap hit on set (new key)
  • bad_input — key validation failed (whitespace, too long, empty)
  • network_error — transport failure between your container and the platform
  • platform_error — 5xx from the platform side

Surface these to the user where it makes sense; for activity-log writes, swallowing transient errors is usually fine since the next sweep will rewrite the row.

Server-side: MockKv is exported from linkworld_sdk and lives in plain memory — perfect for unit tests:

from linkworld_sdk import Context, MockKv, MockTools, MockSecrets
async def test_settings_default_to_empty_dict():
kv = MockKv()
ctx = Context(
tenant_id="t", user_id=None, app_id="my-app",
event_type="schedule",
tools=MockTools(), secrets=MockSecrets(), kv=kv,
)
settings = await ctx.kv.get("settings") or {}
assert settings == {}
async def test_round_trip():
kv = MockKv()
await kv.set("foo", {"a": 1})
assert await kv.get("foo") == {"a": 1}
assert await kv.delete("foo") is True
assert await kv.get("foo") is None

MockKv skips the size + count caps — those are server-side concerns, covered by integration tests on the platform side.