Blob Storage Service
A lightweight MCP data broker that stores large binary payloads under a UUID, enabling MCP clients and agents to exchange massive results without flooding the LLM context window.
Architecture
MCP Client / LLM Agent
│
│ tool call (large result)
▼
MCP Server ──store_blob──▶ Blob Storage Service ──▶ SQLite + local files
│ │
│ returns UUID │ retrieve_blob(uuid)
◀──────────────────────────────┘
The agent only ever sees a UUID token — not the raw bytes.
Project Structure
src/blob_storage/
├── __init__.py # package metadata
├── config.py # pydantic-settings (env / .env)
├── models.py # SQLAlchemy ORM + Pydantic schemas
├── database.py # async engine, session factory, init_db()
├── storage.py # StorageBackend ABC + LocalFileBackend
├── service.py # BlobService (orchestration)
├── api.py # FastAPI REST API
└── mcp_server.py # MCP server (HTTP Streamable, /mcp)
tests/
├── conftest.py # shared fixtures
├── test_api.py # API integration tests
└── test_service.py # service unit tests
Quickstart
Install dependencies
uv sync
Start the REST API
uv run blob-api # defaults from config / .env
uv run blob-api --host 0.0.0.0 --port 9000 # override at runtime
Interactive docs available at http://<host>:<port>/docs.
Start the MCP server
uv run blob-mcp # defaults: 0.0.0.0:8001
uv run blob-mcp --host 0.0.0.0 --port 9001
The MCP endpoint is served at http://<host>:<port>/mcp.
Configuration
Settings are read from environment variables or a .env file in the project root.
| Variable | Default | Description | |---|---|---| | STORAGE_DIR | ./data/blobs | Directory for blob files | | DATABASE_URL | sqlite+aiosqlite:///./data/metadata.db | SQLAlchemy async DB URL | | DEFAULT_TTL_SECONDS | 7776000 | Default TTL — 90 days. Use 0 for no expiry. | | MAX_BLOB_SIZE_BYTES | 524288000 | Max upload size (500 MB) | | API_HOST | 0.0.0.0 | REST API bind address | | API_PORT | 8000 | REST API port | | MCP_HOST | 0.0.0.0 | MCP server bind address | | MCP_PORT | 8001 | MCP server port | | MCP_API_BASE_URL | http://127.0.0.1:8000 | REST API URL used internally by the MCP server | | SWEEP_INTERVAL_SECONDS | 3600 | How often the TTL cleanup job runs |
CLI flags (
--host,--port) take precedence over config/env for the respective server.
REST API Reference
| Method | Path | Description | |---|---|---| | GET | /health | Liveness check | | POST | /blobs | Store a blob | | GET | /blobs/{uuid} | Download a blob | | GET | /blobs/{uuid}/meta | Metadata only (no payload) | | DELETE | /blobs/{uuid} | Delete a blob |
Store a blob
Pass metadata as custom request headers:
| Header | Default | Description | |---|---|---| | X-Mime-Type | application/octet-stream | MIME type of the payload | | X-Ttl-Seconds | config default | Override TTL; 0 = never expires | | X-Origin | — | Free-form source identifier | | X-Tags | — | JSON object for arbitrary labels |
curl -X POST http://localhost:8000/blobs \
-H "Content-Type: application/octet-stream" \
-H "X-Mime-Type: application/json" \
-H "X-Origin: my-mcp-server" \
--data-binary '{"key": "value"}'
Response: ``json { "uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "expires_at": "2026-05-30T16:42:07Z", "size_bytes": 16 } ``
MCP Integration
The MCP server uses the Streamable HTTP transport. Configure your MCP host to connect over HTTP rather than spawning a subprocess:
{
"mcpServers": {
"blob-storage": {
"type": "http",
"url": "http://127.0.0.1:8001/mcp"
}
}
}
Available tools
| Tool | Input | Output | |---|---|---| | store_blob | data_b64, mime_type, ttl_seconds?, origin?, tags? | { uuid, expires_at, size_bytes } | | retrieve_blob | uuid | { uuid, mime_type, data_b64 } | | get_blob_meta | uuid | metadata JSON | | delete_blob | uuid | { deleted: uuid } |
All binary data is base64-encoded in MCP tool calls (MCP messages are JSON/text).
Running Tests
uv run pytest tests/ -v
Extending the Storage Backend
To use S3 or MinIO in production, implement the StorageBackend abstract class in storage.py:
from blob_storage.storage import StorageBackend
class S3Backend(StorageBackend):
async def store(self, blob_uuid: str, data: bytes) -> None: ...
async def retrieve(self, blob_uuid: str) -> bytes: ...
async def delete(self, blob_uuid: str) -> None: ...
Then inject it into BlobService and the _backend singleton in api.py.






