Cloudflare D1 Sharding Router
A TypeScript library for horizontal scaling of SQLite-style databases on Cloudflare using D1 and KV. CollegeDB simulates vertical scaling by routing queries to the correct D1 database instance using primary key mappings stored in Cloudflare KV.
CollegeDB provides a sharding layer on top of Cloudflare D1 databases, enabling you to:
bun add collegedb
# or
npm install collegedb
import { initialize, createSchema, run, first } from 'collegedb';
// Initialize with your Cloudflare bindings (existing databases work automatically!)
initialize({
kv: env.KV,
coordinator: env.ShardCoordinator,
shards: {
'db-east': env['db-east'], // Can be existing DB with data
'db-west': env['db-west'] // Can be existing DB with data
},
strategy: 'hash' // or 'round-robin', 'random'
});
// Create schema on new shards only (existing shards auto-detected)
await createSchema(env['db-new-shard']);
// Insert data (automatically routed to appropriate shard)
await run('user-123', 'INSERT INTO users (id, name, email) VALUES (?, ?, ?)', ['user-123', 'Alice Johnson', 'alice@example.com']);
// Query data (automatically routed to correct shard, works with existing data!)
const result = await first<User>('existing-user-456', 'SELECT * FROM users WHERE id = ?', ['existing-user-456']);
console.log(result); // User data from existing database
CollegeDB supports seamless, automatic integration with existing D1 databases that already contain data. Simply add your existing databases as shards in the configuration. CollegeDB will automatically detect existing data and create the necessary shard mappings without requiring any manual migration steps.
id
)import { initialize, first, run } from 'collegedb';
// Add your existing databases as shards - that's it!
initialize({
kv: env.KV,
shards: {
'db-users': env.ExistingUserDB, // Your existing database with users
'db-orders': env.ExistingOrderDB, // Your existing database with orders
'db-new': env.NewDB // Optional new shard for growth
},
strategy: 'hash'
});
// Existing data works immediately! 🎉
const existingUser = await first('user-from-old-db', 'SELECT * FROM users WHERE id = ?', ['user-from-old-db']);
// New data gets distributed automatically
await run('new-user-123', 'INSERT INTO users (id, name, email) VALUES (?, ?, ?)', ['new-user-123', 'New User', 'new@example.com']);
That's it! No migration scripts, no manual mapping creation, no downtime. Your existing data is immediately accessible through CollegeDB's sharding system.
You can manually validate databases before integration if needed:
import { validateTableForSharding, listTables } from 'collegedb';
// Check database structure
const tables = await listTables(env.ExistingDB);
console.log('Found tables:', tables);
// Validate each table
for (const table of tables) {
const validation = await validateTableForSharding(env.ExistingDB, table);
if (validation.isValid) {
console.log(`✅ ${table}: ${validation.recordCount} records ready`);
} else {
console.log(`❌ ${table}: ${validation.issues.join(', ')}`);
}
}
If you want to inspect existing data before automatic migration:
import { discoverExistingPrimaryKeys } from 'collegedb';
// Discover all user IDs in existing users table
const userIds = await discoverExistingPrimaryKeys(env.ExistingDB, 'users');
console.log(`Found ${userIds.length} existing users`);
// Custom primary key column
const orderIds = await discoverExistingPrimaryKeys(env.ExistingDB, 'orders', 'order_id');
For complete control over the integration process:
import { integrateExistingDatabase, KVShardMapper } from 'collegedb';
const mapper = new KVShardMapper(env.KV);
// Integrate your existing database
const result = await integrateExistingDatabase(
env.ExistingDB, // Your existing D1 database
'db-primary', // Shard name for this database
mapper, // KV mapper instance
{
tables: ['users', 'posts', 'orders'], // Tables to integrate
primaryKeyColumn: 'id', // Primary key column name
strategy: 'hash', // Allocation strategy for future records
addShardMappingsTable: true, // Add CollegeDB metadata table
dryRun: false // Set true for testing
}
);
if (result.success) {
console.log(`✅ Integrated ${result.totalRecords} records from ${result.tablesProcessed} tables`);
} else {
console.error('Integration issues:', result.issues);
}
After integration, initialize CollegeDB with your existing databases as shards:
import { initialize, first } from 'collegedb';
// Include existing databases as shards
initialize({
kv: env.KV,
coordinator: env.ShardCoordinator,
shards: {
'db-primary': env.ExistingDB, // Your integrated existing database
'db-secondary': env.AnotherExistingDB, // Another existing database
'db-new': env.NewDB // Optional new shard for growth
},
strategy: 'hash'
});
// Existing data is now automatically routed!
const user = await first('existing-user-123', 'SELECT * FROM users WHERE id = ?', ['existing-user-123']);
The simplest possible integration - just add your existing databases:
import { initialize, first, run } from 'collegedb';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Step 1: Initialize with existing databases (automatic migration happens here!)
initialize({
kv: env.KV,
shards: {
'db-users': env.ExistingUserDB, // Your existing database with users
'db-orders': env.ExistingOrderDB, // Your existing database with orders
'db-new': env.NewDB // New shard for future growth
},
strategy: 'hash'
});
// Step 2: Use existing data immediately - no migration needed!
// Supports typed queries, inserts, updates, deletes, etc.
const existingUser = await first<User>('user-from-old-db', 'SELECT * FROM users WHERE id = ?', ['user-from-old-db']);
// Step 3: New data gets distributed automatically
await run('new-user-123', 'INSERT INTO users (id, name, email) VALUES (?, ?, ?)', ['new-user-123', 'New User', 'new@example.com']);
return new Response(
JSON.stringify({
existingUser: existingUser.results[0],
message: 'Automatic drop-in replacement successful!'
})
);
}
};
If your tables use different primary key column names:
// For tables with custom primary key columns
const productIds = await discoverExistingPrimaryKeys(env.ProductDB, 'products', 'product_id');
const sessionIds = await discoverExistingPrimaryKeys(env.SessionDB, 'sessions', 'session_key');
Integrate only specific tables from existing databases:
const result = await integrateExistingDatabase(env.ExistingDB, 'db-legacy', mapper, {
tables: ['users', 'orders'] // Only integrate these tables
// Skip 'temp_logs', 'cache_data', etc.
});
Test integration without making changes:
const testResult = await integrateExistingDatabase(env.ExistingDB, 'db-test', mapper, {
dryRun: true // No actual mappings created
});
console.log(`Would process ${testResult.totalRecords} records from ${testResult.tablesProcessed} tables`);
// Simple rollback - clear all mappings
import { KVShardMapper } from 'collegedb';
const mapper = new KVShardMapper(env.KV);
await mapper.clearAllMappings(); // Returns to pre-migration state
// Or clear cache to force re-detection
import { clearMigrationCache } from 'collegedb';
clearMigrationCache(); // Forces fresh migration check
Tables without Primary Keys
// Error: Primary key column 'id' not found
// Solution: Add primary key to existing table
await db.prepare(`ALTER TABLE legacy_table ADD COLUMN id TEXT PRIMARY KEY`).run();
Large Database Integration
// For very large databases, integrate in batches
const allTables = await listTables(env.LargeDB);
const batchSize = 2;
for (let i = 0; i < allTables.length; i += batchSize) {
const batch = allTables.slice(i, i + batchSize);
await integrateExistingDatabase(env.LargeDB, 'db-large', mapper, {
tables: batch
});
}
Mixed Primary Key Types
// Handle different primary key column names per table
const customIntegration = {
users: 'user_id',
orders: 'order_number',
products: 'sku'
};
for (const [table, pkColumn] of Object.entries(customIntegration)) {
const keys = await discoverExistingPrimaryKeys(env.DB, table, pkColumn);
await createMappingsForExistingKeys(keys, ['db-shard1'], 'hash', mapper);
}
Function | Description | Parameters |
---|---|---|
initialize(config) |
Initialize CollegeDB with configuration | CollegeDBConfig |
createSchema(d1) |
Create database schema on a D1 instance | D1Database |
prepare(key, sql) |
Prepare a SQL statement for execution | string, string |
run(key, sql, bindings) |
Execute a SQL query with primary key routing | string, string, any[] |
first(key, sql, bindings) |
Execute a SQL query and return first result | string, string, any[] |
all(key, sql, bindings) |
Execute a SQL query and return all results | string, string, any[] |
reassignShard(key, newShard) |
Move primary key to different shard | string, string |
listKnownShards() |
Get list of available shards | void |
getShardStats() |
Get statistics for all shards | void |
Function | Description | Parameters |
---|---|---|
autoDetectAndMigrate(d1, shard, config) |
NEW: Automatically detect and migrate existing data | D1Database, string, config |
checkMigrationNeeded(d1, shard, config) |
NEW: Check if database needs migration | D1Database, string, config |
validateTableForSharding(d1, table) |
Check if table is suitable for sharding | D1Database, string |
discoverExistingPrimaryKeys(d1, table) |
Find all primary keys in existing table | D1Database, string |
integrateExistingDatabase(d1, shard) |
Complete drop-in integration of existing DB | D1Database, string, mapper |
createMappingsForExistingKeys(keys) |
Create shard mappings for existing keys | string[], string[], strategy |
listTables(d1) |
Get list of tables in database | D1Database |
clearMigrationCache() |
NEW: Clear automatic migration cache | void |
┌─────────────────────────────────────────────────────────────┐
│ Cloudflare Worker │
├─────────────────────────────────────────────────────────────┤
│ CollegeDB Router │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ KV │ │ Durable │ │ Query Router │ │
│ │ Mappings │ │ Objects │ │ │ │
│ │ │ │ (Optional) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ D1 East │ │ D1 West │ │ D1 Central │ │
│ │ Shard │ │ Shard │ │ Shard │ │
│ │ │ │ │ │ (Optional) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
# Create multiple D1 databases for sharding
wrangler d1 create collegedb-east
wrangler d1 create collegedb-west
wrangler d1 create collegedb-central
# Create KV namespace for shard mappings
wrangler kv namespace create "KV"
[[d1_databases]]
binding = "db-east"
database_name = "collegedb-east"
database_id = "your-database-id"
[[d1_databases]]
binding = "db-west"
database_name = "collegedb-west"
database_id = "your-database-id"
[[kv_namespaces]]
binding = "KV"
id = "your-kv-namespace-id"
[[durable_objects.bindings]]
name = "ShardCoordinator"
class_name = "ShardCoordinator"
# Deploy to Cloudflare Workers
wrangler deploy
# Deploy with environment
wrangler deploy --env production
import { getShardStats, listKnownShards } from 'collegedb';
// Get detailed statistics
const stats = await getShardStats();
console.log(stats);
// [
// { binding: 'db-east', count: 1542 },
// { binding: 'db-west', count: 1458 }
// ]
// List available shards
const shards = await listKnownShards();
console.log(shards); // ['db-east', 'db-west']
import { reassignShard } from 'collegedb';
// Move a primary key to a different shard
await reassignShard('user-123', 'db-west');
Monitor your CollegeDB deployment by tracking:
initialize({
kv: env.KV,
shards: { 'db-east': env['db-east'], 'db-west': env['db-west'] },
strategy: 'hash' // Shard selection based on primary key hash
});
const config = {
kv: env.KV,
shards: env.NODE_ENV === 'production' ? { 'db-prod-1': env['db-prod-1'], 'db-prod-2': env['db-prod-2'] } : { 'db-dev': env['db-dev'] },
strategy: 'round-robin' // Shard selection is evenly distributed, regardless of size
};
initialize(config);
git checkout -b feature/amazing-feature
git commit -m 'Add amazing feature'
git push origin feature/amazing-feature
This project is licensed under the MIT License - see the LICENSE file for details.