一、MongoDB 简介
1. 什么是 MongoDB?
MongoDB 是一个基于 文档 的 NoSQL 数据库,使用 BSON(Binary JSON)格式存储数据。它支持灵活的数据模型、水平扩展和高性能查询。
2. MongoDB 的特点
- 文档存储:数据以 JSON-like 文档的形式存储,字段可以动态扩展。
- 灵活的模式:不需要预先定义表结构(Schema-less)。
- 高性能:支持索引、聚合、分片等功能,适合处理大规模数据。
- 水平扩展:通过分片(Sharding)实现数据的分布式存储。
- 高可用性:通过副本集(Replica Set)实现数据的自动故障恢复。
二、MongoDB 核心概念
1. 文档(Document)
-
MongoDB 中的基本数据单元,类似于 JSON 对象。
-
示例:
json
{
"_id": ObjectId("5f50c31b1c9d440000a1b2c3"),
"name": "Alice",
"age": 25,
"address": {
"city": "Beijing",
"zip": "100000"
}
}
2. 集合(Collection)
- 类似于关系型数据库中的表,用于存储文档。
- 集合不需要固定的模式,文档结构可以不同。
3. 数据库(Database)
- 一个 MongoDB 实例可以包含多个数据库,每个数据库可以包含多个集合。
4. 索引(Index)
- 用于加速查询,类似于关系型数据库中的索引。
- 常见的索引类型:单字段索引、复合索引、唯一索引、文本索引等。
5. 分片(Sharding)
- 将数据分布到多个服务器上,支持水平扩展。
6. 副本集(Replica Set)
- 一组 MongoDB 实例,包含一个主节点(Primary)和多个从节点(Secondary),用于实现高可用性和数据冗余。
三、MongoDB 基本操作
1. 数据库操作
- 切换数据库:
bash
use mydb
- 查看所有数据库:
bash
show dbs
2. 集合操作
- 创建集合:
bash
db.createCollection("mycollection")
- 查看所有集合:
bash
show collections
3. 文档操作
- 插入文档:
bash
db.mycollection.insert({ name: "Alice", age: 25 })
- 查询文档:
bash
db.mycollection.find({ age: { $gt: 20 } })
- 更新文档:
bash
db.mycollection.update({ name: "Alice" }, { $set: { age: 26 } })
- 删除文档:
bash
db.mycollection.remove({ name: "Alice" })
四、MongoDB 查询与索引
1. 查询操作
- 查询所有文档:
bash
db.mycollection.find()
- 条件查询:
bash
db.mycollection.find({ age: { $gt: 20 } })
- 投影查询(只返回指定字段):
bash
db.mycollection.find({}, { name: 1, age: 1 })
2. 索引操作
- 创建单字段索引:
bash
db.mycollection.createIndex({ age: 1 })
- 创建复合索引:
bash
db.mycollection.createIndex({ name: 1, age: -1 })
- 查看索引:
bash
db.mycollection.getIndexes()
- 删除索引:
bash
db.mycollection.dropIndex("index_name")
五、MongoDB 聚合
MongoDB 提供了强大的聚合框架(Aggregation Framework),用于对数据进行分组、过滤、排序等操作。
1. 聚合管道
- 示例:按城市分组,计算每个城市的平均年龄。
bash
db.mycollection.aggregate([
{ $group: { _id: "$address.city", avgAge: { $avg: "$age" } } }
])
2. 常用聚合操作符
$match:过滤文档。$group:分组文档。$sort:排序文档。$project:选择字段。$limit:限制返回的文档数量。
六、MongoDB 事务
MongoDB 从 4.0 版本开始支持多文档事务。
1. 事务的使用
- 开启事务:
javascript
session.startTransaction();
- 提交事务:
javascript
session.commitTransaction();
- 回滚事务:
javascript
session.abortTransaction();
2. 事务示例
const session = db.getMongo().startSession();
session.startTransaction();
try {
const collection = session.getDatabase("mydb").mycollection;
collection.insertOne({ name: "Alice", age: 25 });
collection.updateOne({ name: "Alice" }, { $set: { age: 26 } });
session.commitTransaction();
} catch (error) {
session.abortTransaction();
} finally {
session.endSession();
}
七、MongoDB 性能优化
1. 使用索引
- 为常用查询字段创建索引。
- 避免全集合扫描。
2. 分片
- 将数据分布到多个分片上,提高查询性能。
3. 副本集
- 使用副本集实现读写分离,提高读取性能。
4. 避免大文档
- 大文档会增加存储和查询的开销,尽量将文档拆分为多个小文档。
八 示例代码,聚合操作
client = MongoClient("mongodb://******:27017",connectTimeoutMS=30000, socketTimeoutMS=30000)
# 选择数据库
db = client.xiaogang
# 选择集合
collection = db.sfis_main
address = ['guiyang','bijie','qianxi','shanghai']
datas = [{
'_id': ObjectId(), # 生成唯一的 ObjectId
'phonenumber': fake.phone_number(),
'age': random.randint(20, 60),
# 'company': fake.company(),
'address': random.choice(address),
'home':fake.address(),
'car_type': random.choice(['SUV', '轿车', '卡车', '电动车']),
'phone_type': random.choice(['iPhone', 'Android', 'other'])
} for i in range(100)]
try:
result = collection.insert_many(datas)
print("插入成功,插入的文档 IDs:", result.inserted_ids)
except BulkWriteError as e:
print("批量插入失败,错误详情:", e.details)
# pipline = [
# {"$match": {"age": {"$gte": 30}, "phone_type": "other"}},
# {"$group": {
# "_id": "$address", # 按 address 分组
# # "documents": {"$push": "$$ROOT"} # 将分组中的文档存储到 documents 数组中
# "count": {"$sum": 1}
# }},
# {"$project": {
# "address": "$_id",
# "documents": {"$slice": ["$documents", 10]}, # 只保留前 10 个文档
# "_id": 0
# }},
# {"$unwind": "$tags"}
# ]
# pipline = [
# {"$match": {"age": {"$gte": 30}, "phone_type": "other"}},
# {"$group": {
# "_id": "$address", # 按 address 分组
# "car_type": {"$push": "$car_type"}, # 只存储 name 字段
# "phone_type": {"$push": "$phone_type"} # 只存储 age 字段
# }}
# ]
# pipline = [
# {"$match": {"age": {"$gte": 30}, "phone_type": "other"}},
# {"$group": {
# "_id": "$address", # 按 address 分组
# "documents": {"$push": "$$ROOT"} # 将分组中的文档存储到 documents 数组中
# }}
# ]
pipline = [
{"$match":
{
"home":{"$regex":"^贵州","$regex":"537456$"},
"age":{"$exists":True,"$ne":None}
}
},
{"$addFields":{
"age":{"$toInt":"$age"}
}},
{"$group":
{
"_id":"$address",
"documents":{"$addToSet":"$$ROOT"},
"count":{"$sum":1},
"ageavg":{"$avg":"$age"}}},
{"$project":{
"phonenumber":0
}}
]
# pipline = [
# {"$match": {
# "home": {"$regex": "^贵州"}, # 匹配 home 以 "贵州" 开头的文档
# "age": {"$exists": True, "$ne": None} # 过滤掉 age 字段缺失或为 null 的文档
# }},
# {"$addFields": {
# "age": {"$toInt": "$age"} # 将 age 字段转换为整数
# }},
# {"$group": {
# "_id": "$address",
# "documents": {"$addToSet": { # 只选择需要的字段
# "name": "$name",
# "age": "$age",
# "address": "$address"
# }},
# "count": {"$sum": 1},
# "ageavg": {"$avg": "$age"} # 计算 age 的平均值
# }},
# {"$project": {
# "phonenumber": 0 # 排除 phonenumber 字段
# }}
# ]
result = collection.aggregate(pipline)
print(list(result))
for doc in result:
# print("分组地址:", doc['address'])
# print("分组中的文档:")
print(doc['count'])
# for document in doc['addres']:
# print(document)
# print(list(result))
client.close()