Mongodb集群中的分布式读写
在互联网时代,海量数据、高并发读写是常态。MongoDB 作为一款高性能的 NoSQL 数据库,提供了**复制集(Replica Set)和分片集群(Sharded Cluster)**两种架构,能够有效支持分布式读写和横向扩展。本文将从原理出发,结合代码示例和图解,帮助你理解在 MongoDB 集群环境中如何实现分布式读写,如何选择读写策略,以及常见的调优思路。
1. 基础概念与架构
1.1 复制集(Replica Set)
- 定义:复制集由若干个 MongoDB 节点组成,其中一个节点为主节点(Primary),其他节点为从节点(Secondary)。所有写请求必须发送到 Primary,再同步到各个 Secondary。
作用:
- 高可用:当 Primary 挂掉时,Secondary 会通过选举自动提升一个新的 Primary,业务不中断。
- 读扩展:可以将某些读请求路由到 Secondary(需要配置
readPreference
)。
简单架构图:
┌────────────────────────┐ │ Replica Set │ │ │ │ ┌──────────────┐ │ │ │ Primary (P) │◀────┤ 客户端写入 │ └──────────────┘ │ │ │ │ │ │ Oplog 同步 │ │ ▼ │ │ ┌──────────────┐ │ │ │ Secondary S1 │ │ │ └──────────────┘ │ │ │ │ │ │ Oplog 同步 │ │ ▼ │ │ ┌──────────────┐ │ │ │ Secondary S2 │ │ │ └──────────────┘ │ └────────────────────────┘
1.2 分片集群(Sharded Cluster)
- 定义:将数据按“某个字段的范围或哈希值”切分成多个分片(Shard),每个分片自己是一个复制集。客户端对分布式集群发起读写请求时,查询路由(mongos 进程)会根据分片键来决定把请求路由到哪一个或多个分片。
作用:
- 水平扩展:通过增加分片节点可以让单集合的数据量和吞吐线性增长。
- 数据均衡:MongoDB 会定期把过大或过小的 chunk 在各分片间迁移,实现均衡。
关键组件:
- mongos:查询路由进程,客户端连接目标;
- Config Servers:存储集群元信息(分片映射)的一组服务器(通常是 3 台);
- Shard(复制集):每个分片都是一个复制集。
简化架构图:
┌───────────────────────────────────────────────────────┐ │ Sharded Cluster │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ mongos │◀─▶│ mongos │◀─▶│ mongos │ 客户端多路连接 │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Config │ │ Config │ │ Config │ │ │ │ Server1 │ │ Server2 │ │ Server3 │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ Cluster Meta │ │ ▼ │ │ ┌──────────────────────────┐ ┌───────────────────────┐│ │ │ Shard (RS) │ │ Shard (RS) ││ │ │ ┌──────────┐ ┌────────┐ │ │ ┌──────────┐ ┌────┐ ││ │ │ │ Primary │ │ Sec 1 │ │ │ │ Primary │ │... │ ││ │ │ └──────────┘ └────────┘ │ │ └──────────┘ └────┘ ││ │ │ ┌──────────┐ ┌────────┐ │ │ ┌──────────┐ ┌────┐ ││ │ │ │ Sec 2 │ │ Sec 3 │ │ │ │ Sec 2 │ │... │ ││ │ │ └──────────┘ └────────┘ │ │ └──────────┘ └────┘ ││ │ └──────────────────────────┘ └───────────────────────┘│ └───────────────────────────────────────────────────────────┘
2. 复制集中的分布式读写
首先看最常见的“单个复制集”场景。复制集内主节点负责写,从节点同步数据并可承担部分读流量。
2.1 写入流程
- 客户端 连接到复制集时,一般会在 URI 中指定多个节点地址,并设置
replicaSet
名称。 - 驱动:自动发现哪个节点是 Primary,所有写操作经由 Primary 执行。
- Primary:执行写操作后,将操作以“Oplog(操作日志)”的形式记录在本地
local.oplog.rs
集合中。 - Secondary:通过读取 Primary 的 Oplog 并应用,保证数据最终一致。
2.1.1 连接字符串示例(Node.js Mongoose)
const mongoose = require('mongoose');
const uri = 'mongodb://user:pwd@host1:27017,host2:27017,host3:27017/mydb?replicaSet=rs0&readPreference=primary';
mongoose.connect(uri, {
useNewUrlParser: true,
useUnifiedTopology: true
}).then(() => {
console.log('Connected to Primary of Replica Set!');
}).catch(err => {
console.error('Connection error', err);
});
host1,host2,host3
:至少写入两个或三个复制集节点的地址,驱动可自动发现并选择 Primary。replicaSet=rs0
:指定复制集名称。readPreference=primary
:强制读写都只读 Primary(默认)。
2.2 读取策略
MongoDB 客户端支持多种 Read Preference,可根据业务需求将读流量分流到 Secondary,以减轻 Primary 压力或实现“最近优先”地理分布读。
- primary(默认):所有读写都到 Primary。
- primaryPreferred:优先读 Primary,Primary 不可用时读 Secondary。
- secondary:只读 Secondary。
- secondaryPreferred:优先读 Secondary,Secondary 不可用时读 Primary。
- nearest:读最“近”的节点(根据 ping 值或自定义标签)。
2.2.1 代码示例:Node.js 原生驱动
const { MongoClient } = require('mongodb');
const uri = 'mongodb://user:pwd@host1:27017,host2:27017,host3:27017/mydb?replicaSet=rs0';
// 使用 secondaryPreferred 读取
MongoClient.connect(uri, {
useNewUrlParser: true,
useUnifiedTopology: true,
readPreference: 'secondaryPreferred'
}).then(async client => {
const db = client.db('mydb');
const col = db.collection('users');
// 查找操作会优先从 Secondary 获取
const user = await col.findOne({ name: 'Alice' });
console.log('Found user:', user);
client.close();
}).catch(err => console.error(err));
- 当 Secondary 正常可用时,查询会命中某个 Secondary。
- 如果 Secondary 都不可用,则回退到 Primary(
secondaryPreferred
模式)。
2.3 复制延迟与一致性考量
- 复制延迟(Replication Lag):Secondary 从 Primary 拉取并应用 Oplog 需要时间。在高写入量时,可能会看到 Secondary 的数据稍有“滞后”现象。
- 因果一致性需求:若应用对“刚写入的数据”有强一致性要求,就不要将此时的读请求发往 Secondary,否则可能读不到最新写入。可以暂时设置
readPreference=primary
或在应用层强制先“刷新” Primary 后再读 Secondary。
2.3.1 检测复制延迟
可以在 Secondary 上执行:
db.adminCommand({ replSetGetStatus: 1 })
结果中会包含各个节点的 optimeDate
,比较 Primary 与 Secondary 的时间差就能估算延迟。
3. 分片集群中的分布式读写
分片集群除了复制集的功能外,还要考虑“数据分布”与“路由”。所有对分片集群的读写操作都经由 mongos
路由器,而 mongod
节点只负责所在分片上的数据。
3.1 写入流程
- 客户端 连接到若干个
mongos
(可以是多台,以负载均衡入口)。 - 写操作:携带分片键(shard key),
mongos
根据当前分片映射决定将写请求发往哪个分片的 Primary。 - 分片内写入:落到对应分片的 Primary,再复制到自己分片的 Secondary。
3.1.1 分片键选择
- 分片键应当具有较好的随机性或均匀分布,否则可能出现单个分片过热。
- 常见策略:使用哈希型分片键,如
{ user_id: "hashed" }
,即将user_id
先做哈希后取模分片(均匀)。 - 也可使用范围分片(
{ timestamp: 1 }
),适用于时序数据,但会产生热点分片(插入都落到一个分片)。
3.1.2 分片写入示例(Node.js Mongoose)
const mongoose = require('mongoose');
// 连接到 mongos(可以是多个地址)
const uri = 'mongodb://mongos1:27017,mongos2:27017/mydb?replicaSet=rs0';
mongoose.connect(uri, {
useNewUrlParser: true,
useUnifiedTopology: true
});
// 定义 Schema,指定分片键为 user_id
const userSchema = new mongoose.Schema({
user_id: { type: Number, required: true },
name: String,
age: Number
}, { shardKey: { user_id: 'hashed' } });
const User = mongoose.model('User', userSchema);
async function insertUsers() {
for (let i = 0; i < 1000; i++) {
await User.create({ user_id: i, name: `User${i}`, age: 20 + (i % 10) });
}
console.log('Batch insert done');
}
insertUsers().catch(console.error);
- 先在 Mongo Shell 或程序中执行
sh.enableSharding("mydb")
、sh.shardCollection("mydb.users", { user_id: "hashed" })
,为users
集合开启分片并指定分片键。 - 上述写入时,
mongos
会将文档路由到对应分片的某个 Primary,上层无需感知分片细节。
3.2 读取流程
分片集群的读取也总是经过 mongos
,但可以根据不同场景采用不同的 Read Preference。
针对单文档查询(包含分片键)
mongos
会将查询路由到单个分片,避免广播到所有分片。
通用查询(不包含分片键或范围查询)
mongos
会广播查询到所有分片,分别从各分片的 Primary 或 Secondary(取决于客户端指定)获取结果,再在客户端合并。
读偏好
- 同复制集一样,可以在连接字符串或查询时指定
readPreference
,决定是否允许从 Secondary 读取。
- 同复制集一样,可以在连接字符串或查询时指定
3.2.1 分片查询示例
// 连接到 mongos,指定 preferential read to secondary
const uri = 'mongodb://mongos1:27017,mongos2:27017/mydb?readPreference=secondaryPreferred';
MongoClient.connect(uri, { useNewUrlParser: true, useUnifiedTopology: true })
.then(async client => {
const db = client.db('mydb');
const users = db.collection('users');
// 包含分片键的单文档查询 → 只访问一个分片
let doc = await users.findOne({ user_id: 123 });
console.log('Single shard query:', doc);
// 不包含分片键的聚合查询 → 广播到所有分片,再合并
const cursor = users.aggregate([
{ $match: { age: { $gt: 25 } } },
{ $group: { _id: null, avgAge: { $avg: "$age" } } }
]);
const result = await cursor.toArray();
console.log('Broadcast aggregation:', result);
client.close();
})
.catch(console.error);
- 对于
findOne({ user_id: 123 })
,mongos
根据user_id
哈希值确定只访问一个分片 Primary/Secondary。 - 对于不包含
user_id
的聚合,会广播到所有分片节点,分别在各分片执行$match
和$group
,最后将每个分片的局部结果汇总到mongos
,再做终合并。
4. 图解:Replica Set 与 Sharded Cluster 中的读写
为帮助学习,下面通过 ASCII 图结合文字说明,直观展示读写在两种架构中的流向。
4.1 复制集中写入与读取
┌────────────────────────┐
│ 客户端应用 │
│ │
│ 读/写请求(Mongoose) │
└──────────┬─────────────┘
│
┌──────────────▼─────────────────┐
│ MongoClient 驱动 │
│(自动发现 Primary / Secondary)│
└──────────────┬─────────────────┘
│
┌────────────────▼────────────────┐
│ 复制集(Replica Set) │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Primary│ │Secondary│ │Secondary│ │
│ │ P │ │ S1 │ │ S2 │ │
│ └─┬──────┘ └─┬──────┘ └─┬──────┘ │
│ │Writes │Oplog Sync │ │
│ │(W) │ │ │
│ ▼ ▼ ▼ │
│ /data/db/ /data/db/ /data/db/ │
└───────────────────────────────────────────────┘
- **写入**:驱动自动将写请求发往 Primary (P),Primary 在本地数据目录 `/data/db/` 写入数据,并记录 Oplog。
- **同步**:各 Secondary (S1、S2) 从 Primary 的 Oplog 拉取并应用写操作,保持数据最终一致。
- **读取**:若 `readPreference=primary`,读 P;若 `readPreference=secondary`,可读 S1 或 S2。
4.2 分片集群中读写流程
┌───────────────────────────────────────────────────────────────────────────────┐
│ 客户端应用 (Node.js) │
│ ┌───────────────────────────────┬───────────────────────────────────────┐ │
│ │写:insert({user_id:123, ...}) │ 读:find({user_id:123}) │ │
│ └───────────────┬───────────────┴───────────────┬───────────────────────────┘ │
└──────────────────────────────┬───────────────────┴─────────────────────────────┘
│
▼
┌─────────────────────────┐
│ mongos │ ←── 客户端连接(可以多个 mongos 做负载均衡)
└──────────┬──────────────┘
│
┌──────────────┴───────────────┐
│ 分片路由逻辑 │
│ (根据分片键计算 hash%shardCount) │
└──────────────┬───────────────┘
│
┌─────────────────────┴───────────────────────┐
│ │
┌───────▼─────┐ ┌───────▼─────┐
│ Shard1 │ │ Shard2 │
│ ReplicaSet1 │ │ ReplicaSet2 │
│ ┌───────┐ │ │ ┌───────┐ │
│ │ P1 │ │ │ │ P2 │ │
│ └──┬────┘ │ │ └──┬────┘ │
│ │ sync │ │ │ sync │
│ ┌──▼────┐ │ │ ┌──▼────┐ │
│ │ S1 │ │ │ │ S3 │ │
│ └───────┘ │ │ └───────┘ │
│ ┌───────┐ │ │ ┌───────┐ │
│ │ S2 │ │ │ │ S4 │ │
│ └───────┘ │ │ └───────┘ │
└─────────────┘ └─────────────┘
- **写操作**:
1. `mongos` 读取文档的 `user_id` 做哈希 `%2` → 结果若为 1,则路由到 Shard2.P2,否则路由 Shard1.P1。
2. Primary (P) 在本地写入后,Secondary(S) 同步 Oplog。
- **读操作(包含分片键)**:
1. `find({user_id:123})` → `mongos` 计算 `123%2=1` → 只访问 Shard2。
2. 如果 `readPreference=secondaryPreferred`,则可选择 S3、S4。
- **读操作(不包含分片键)**:
1. `find({age:{$gt:30}})` → `mongos` 广播到 Shard1 和 Shard2。
2. 在每个 Shard 上的 Primary/Secondary 执行子查询,结果由 `mongos` 汇总返回。
5. 代码示例与说明
下面通过实际代码示例,演示在复制集和分片集群中如何配置并进行分布式读写。
5.1 Replica Set 场景
5.1.1 启动复制集(简化)
在三台机器 mongo1:27017
、mongo2:27017
、mongo3:27017
上分别启动 mongod
:
# /etc/mongod.conf 中:
replication:
replSetName: "rs0"
net:
bindIp: 0.0.0.0
port: 27017
启动后,在 mongo1
上初始化复制集:
// mongo shell
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "mongo1:27017" },
{ _id: 1, host: "mongo2:27017" },
{ _id: 2, host: "mongo3:27017" }
]
});
5.1.2 Node.js 分布式读写示例
const { MongoClient } = require('mongodb');
(async () => {
const uri = 'mongodb://user:pwd@mongo1:27017,mongo2:27017,mongo3:27017/mydb?replicaSet=rs0';
// 此处不指定 readPreference,默认为 primary
const client = await MongoClient.connect(uri, {
useNewUrlParser: true,
useUnifiedTopology: true
});
const db = client.db('mydb');
const users = db.collection('users');
// 写入示例
await users.insertOne({ name: 'Alice', age: 30 });
console.log('Inserted Alice');
// 读取示例(Primary)
let alice = await users.findOne({ name: 'Alice' });
console.log('Primary read:', alice);
// 指定 secondaryPreferred
const client2 = await MongoClient.connect(uri, {
useNewUrlParser: true,
useUnifiedTopology: true,
readPreference: 'secondaryPreferred'
});
const users2 = client2.db('mydb').collection('users');
let alice2 = await users2.findOne({ name: 'Alice' });
console.log('SecondaryPreferred read:', alice2);
client.close();
client2.close();
})();
- 写入总是到 Primary。
- 第二个连接示例中
t readPreference=secondaryPreferred
,可从 Secondary 读取(可能有复制延迟)。
5.2 Sharded Cluster 场景
5.2.1 配置分片(Mongo Shell)
假设创建了一个 Sharded Cluster,mongos
可通过 mongo
命令连接到 mongos:27017
:
// 连接到 mongos
sh.enableSharding("testdb");
// 为 users 集合创建分片,分片键为 user_id(哈希型)
sh.shardCollection("testdb.users", { user_id: "hashed" });
// 查看分片状态
sh.status();
5.2.2 Node.js 分布式读写示例
const { MongoClient } = require('mongodb');
(async () => {
// 连接到 mongos 多地址
const uri = 'mongodb://mongos1:27017,mongos2:27017/testdb';
const client = await MongoClient.connect(uri, {
useNewUrlParser: true,
useUnifiedTopology: true,
readPreference: 'secondaryPreferred'
});
const db = client.db('testdb');
const users = db.collection('users');
// 批量插入 1000 条
let docs = [];
for (let i = 0; i < 1000; i++) {
docs.push({ user_id: i, name: `U${i}`, age: 20 + (i % 10) });
}
await users.insertMany(docs);
console.log('Inserted 1000 documents');
// 单文档查询(包含分片键) → 只访问一个分片
let user42 = await users.findOne({ user_id: 42 });
console.log('Find user 42:', user42);
// 聚合查询(不包含分片键) → 广播到所有分片
const agg = users.aggregate([
{ $match: { age: { $gte: 25 } } },
{ $group: { _id: null, avgAge: { $avg: "$age" } } }
]);
const res = await agg.toArray();
console.log('Average age across shards:', res[0].avgAge);
client.close();
})();
insertMany
时,mongos
根据user_id
哈希值决定每条文档插入到哪个分片。findOne({ user_id: 42 })
只访问分片42 % shardCount
。- 聚合时,会广播到所有分片。
6. 调优与常见问题
6.1 复制集读写延迟
解决方法:
- 如果 Secondary 延迟过高,可暂时将重要读请求路由到 Primary。
- 优化主从网络带宽与磁盘 I/O,减少 Secondary 应用 Oplog 的延迟。
- 若只需要近实时,允许轻微延迟可将
readPreference=secondaryPreferred
。
6.2 分片热点与数据倾斜
- 原因:使用顺序或单调递增的字段作为分片键(如时间戳、订单号),插入会集中到某个 Shard,造成负载不均衡。
解决方法:
- 哈希分片:使用
{ field: "hashed" }
,使数据分布更均匀; - 组合分片键:比如
{ user_id: 1, time: 1 }
,先user_id
哈希或 UUID,再组合时间; - 定期拆分 chunk:如果某个 chunk 太大,可手动拆分(
sh.splitChunk()
)并移动到其他 shard。
- 哈希分片:使用
6.3 写入吞吐与批量
- 批量写入:尽量使用
insertMany()
等批量 API 减少网络往返。 Write Concern:写入时可配置
writeConcern
参数,如{ w: 1 }
(只确认写入到 Primary)或{ w: "majority", wtimeout: 5000 }
(等待多数节点确认)。- 较严格的
w: "majority"
能保证写入可见性和高可用,但会带来更高延迟。
- 较严格的
6.3.1 批量写示例
await users.insertMany(docs, { writeConcern: { w: 'majority', wtimeout: 5000 } });
6.4 查询性能
- 索引:确保对高频查询字段建索引。对于分片集群,如果查询中带有分片键,能走到单个分片,性能最高;否则会广播到所有分片,开销较大。
- 限制返回字段:使用投影(
projection
)只获取需要的字段,减少网络带宽消耗。 - 分页:对于深度分页,使用“范围查询 + 分片键结合”的方式避免跳过过多文档。
7. 总结
本文围绕“MongoDB 集群中的分布式读写”展开,深入讲解了:
- 复制集架构:如何配置 Replica Set,客户端如何设置
readPreference
从 Secondary 读取;复制延迟与一致性问题。 - 分片集群架构:如何配置 Sharded Cluster,如何选择分片键,写入时数据如何路由到对应分片;读取时如何广播或单片访问。
- 代码示例:在 Node.js 环境下分别演示了复制集和分片集群的读写操作,包括连接字符串、
insertMany
、findOne
、聚合查询等实践案例。 - 调优与常见问题:讨论了复制延迟、数据倾斜、写吞吐、查询性能等实战中常见的难点与优化思路。
通过本文的原理阐述、ASCII 图解和代码示例,希望你能够快速理解并掌握 MongoDB 在复制集和分片集群环境下的分布式读写模式,并根据业务特点选择合适的读写策略,以实现高可用、高性能、可扩展的存储系统。