我在前面随笔《在SqlSugar的开发框架中增加对低代码EAV模型(实体-属性-值)的WebAPI实现支持》中介绍了对于EAV数据存储的时候,我们把变化字段的数据记录存储在MongoDB数据库里面,这样除了支持动态化字段变化外,也更好的支持对字段不同类型的查询处理,之前随笔介绍的是基于C#操作MongoDB数据库的处理,由于Python后端FastApi项目的设计初衷是可以平滑更换 SqlSugar项目的Web API的,因此会涉及到在Python项目中对MongoDB的相关操作。本篇随笔先对Python环境中操作MongoDB数据库进行相关的介绍。
1、Python环境中操作MongoDB数据库的准备
如果需要了解相关MongoDB数据库的相关信息和C#开发MongoDB的内容,可以参考我的随笔《MongoDB数据库内容》,里面包含我对这个主题的相关介绍。
具体可以进一步参考官网里面的介绍。
https://www.mongodb.com/zh-cn/docs/
https://www.mongodb.com/zh-cn/docs/drivers/csharp/current/
https://www.mongodb.com/zh-cn/docs/languages/python/
https://www.mongodb.com/zh-cn/docs/drivers/motor/
如果我们需要下载安装,可以根据不同的操作系统下载对应的社区版本安装文件即可。
https://www.mongodb.com/zh-cn/products/self-managed/community-edition
安装完成后,可以使用管理工具进行MongoDB数据库的连接和创建,如下所示可以根据不同的需要创建不同的集合。
关系型数据库和MongoDB的数据库,它们的相关概念,对比关系图如下所示。
我们安装MongoDB数据库后,它是驻留在Window的服务里面,创建服务并顺利启动成功后,然后就可以在系统的服务列表里查看到了,我们确认把它设置为自动启动的Windows服务即可。
由于我们希望使用异步来操作MongoDB数据库,推荐使用motor驱动来操作它。Motor是一个异步mongodb driver,支持异步读写mongodb。
具体使用我们可以参考官网的介绍:https://www.mongodb.com/zh-cn/docs/drivers/motor/
使用前,我们需要再我们FastAPI项目中安装MongoDB和Motor的依赖模块。就是pymonogo和motor
我的requirement.txt文件中包含下面两个
我们如果没有初始化安装,通过pip进行安装即可。
pip install pymongo motor
使用它的基础代码如下所示。
import motor.motor_asyncio client = motor.motor_asyncio.AsyncIOMotorClient() 或者 client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017") 或者 client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://username:password@localhost:27017/dbname')
根据MongoDB官网的说明,MongoDB的适用场景如下:
1)网站实时数据:MongoDB非常适合实时的插入,更新与查询,并具备网站实时数据存储所需的复制及高度伸缩性。
2)数据缓存:由于性能很高,MongoDB也适合作为信息基础设施的缓存层。在系统重启之后,由MongoDB搭建的持久化缓存层可以避免下层的数据源过载。
3)大尺寸、低价值数据存储:使用传统的关系型数据库存储一些数据时可能会比较昂贵,在此之前,很多时候程序员往往会选择传统的文件进行存储。
4)高伸缩性场景:MongoDB非常适合由数十或数百台服务器组成的数据库。MongoDB的路线图中已经包含对MapReduce引擎的内置支持。
5)对象或JSON数据存储:MongoDB的BSON数据格式非常适合文档化格式的存储及查询。
MongoDB数据库支持常规的增删改查等操作,其中它的 find方法很强大,可以组合很多条件查询的方式,如下所示:
db.collection.find({ "key" : value }) 查找key=value的数据 db.collection.find({ "key" : { $gt: value } }) key > value db.collection.find({ "key" : { $lt: value } }) key < value db.collection.find({ "key" : { $gte: value } }) key >= value db.collection.find({ "key" : { $lte: value } }) key <= value db.collection.find({ "key" : { $gt: value1 , $lt: value2 } }) value1 < key xss=removed>value2 db.collection.find({ "key" : { $ne: value } }) key <> value db.collection.find({ "key" : { $mod : [ 10 , 1 ] } }) 取模运算,条件相当于key % 10 == 1 即key除以10余数为1的 db.collection.find({ "key" : { $nin: [ 1, 2, 3 ] } }) 不属于,条件相当于key的值不属于[ 1, 2, 3 ]中任何一个 db.collection.find({ "key" : { $in: [ 1, 2, 3 ] } }) 属于,条件相当于key等于[ 1, 2, 3 ]中任何一个 db.collection.find({ "key" : { $size: 1 } }) $size 数量、尺寸,条件相当于key的值的数量是1(key必须是数组,一个值的情况不能算是数量为1的数组) db.collection.find({ "key" : { $exists : true|false } }) $exists 字段存在,true返回存在字段key的数据,false返回不存在字度key的数据 db.collection.find({ "key": /^val.*val$/i }) 正则,类似like;“i”忽略大小写,“m”支持多行 db.collection.find({ $or : [{a : 1}, {b : 2} ] }) $or或 (注意:MongoDB 1.5.3后版本可用),符合条件a=1的或者符合条件b=2的数据都会查询出来 db.collection.find({ "key": value , $or : [{ a : 1 } , { b : 2 }] }) 符合条件key=value ,同时符合其他两个条件中任意一个的数据 db.collection.find({ "key.subkey" :value }) 内嵌对象中的值匹配,注意:"key.subkey"必须加引号 db.collection.find({ "key": { $not : /^val.*val$/i } }) 这是一个与其他查询条件组合使用的操作符,不会单独使用。上述查询条件得到的结果集加上$not之后就能获得相反的集合。
比较符号说明如下:
符 号 | 含 义 | 示 例 |
---|---|---|
$lt |
小于 | {'age': {'$lt': 20}} |
$gt |
大于 | {'age': {'$gt': 20}} |
$lte |
小于等于 | {'age': {'$lte': 20}} |
$gte |
大于等于 | {'age': {'$gte': 20}} |
$ne |
不等于 | {'age': {'$ne': 20}} |
$in |
在范围内 | {'age': {'$in': [20, 23]}} |
$nin |
不在范围内 | {'age': {'$nin': [20, 23]}} |
另外,还可以进行正则匹配查询。例如,查询名字以 M 开头的学生数据,示例如下:
results = collection.find({'name': {'$regex': '^M.*'}})
这里使用 $regex
来指定正则匹配,^M.*
代表以 M 开头的正则表达式。
多条件查询 $and
$or
# and查询 db.collection.find({ $and : [ { "age" : {$gt : 10 }} , { "gender" : "man" } ] }) #or查询 db.collection.find({ $or : [ {"age" : {$gt : 10 }}, { "gender" : "man"} ] }) #and查询 和 or查询 db.inventory.find( { $and : [ { $or : [ { price : 0.99 }, { price : 1.99 } ] }, { $or : [ { sale : true }, { qty : { $lt : 20 } } ] } ] } )
关于这些操作的更详细用法,可以在 MongoDB 官方文档找到: https://docs.mongodb.com/manual/reference/operator/query/。
当然还有插入更新的处理语句也是很特别的。
db.student.insert({name:'student1',subject:['arts','music']})
特别是更新操作需要说明一下,支持常规的$set方法(修改)、$unset方法(移除指定的键),还有原子级的$inc方法(数值增减),$rename方法(重命名字段名称)等等,
db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")}, {"$set" : {"hobby" :["swimming","basketball"]}} ) db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")},{"$unset" : {"hobby" :1 }} ) db.posts.update({"_id" : ObjectId("5180f1a991c22a72028238e4")}, {"$inc":{"pageviews":1}}) db.students.update( { _id: 1 }, { $rename: { 'nickname': 'alias', 'cell': 'mobile' } }
upsert是一种特殊的更新操作,不是一个操作符。(upsert = up[date]+[in]sert),也就是如果存在则更新,否则就写入一条新的记录操作。这个参数是个布尔类型,默认是false。
db.users.update({age :25}, {$inc :{"age" :3}}, true)
另外,Update可以对Json的集合进行处理,如果对于subject对象是一个集合的话,插入或更新其中的字段使用下面的语句
db.student.update({name:'student5'},{$set:{subject:['music']}},{upsert:true});
如果是记录已经存在,我们可以使用索引数值进行更新其中集合里面的数据,如下所示。
db.student.update({name:'student3'},{$set:{'subject.0':'arts'}});
2、在FastAPI项目中使用pymongo 和motor 操作MongoDB
上面我们大致介绍了一些基础的MongoDB数据库信息。
之前随笔《Python 开发环境的准备以及一些常用类库模块的安装》介绍过我们的FastAPI配置信息,存储在.env文件中的,在FastAPI项目启动的时候,根据需要进行读取加载到对象里面。
如对于配置信息的处理,我们还可以引入 python-dotenv 和 pydantic_settings 来统一管理配置参数。我们的项目.env文件部分配置如下。
最后我们通过pydantic_settings的配置处理,获得相关的配置对象信息,如下。
其中的MONGO_URL 就是我们具体使用的MogoDB数据库连接信息了。
为了和数据库连接一样方便使用MongoDB的连接,我们可以通过在一个独立的文件中声明获得MongoDB的数据库和集合对象,并且通过缓存的方式提高使用效率。
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorCollection from functools import lru_cache from core.config import settings class MongoClientManager: def __init__(self, uri: str = settings.MONGO_URL): """初始化 MongoDB 客户端管理器 Args: uri = mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB_NAME}""" self._client = AsyncIOMotorClient(uri) self._db = self._client.get_default_database() # 使用 URI 中指定的 dbname @property def db(self): return self._db def close(self): self._client.close() # 创建全局 Mongo 管理器实例 @lru_cache() def get_mongo_client() -> MongoClientManager: return MongoClientManager() @lru_cache() def get_collection(entity_model: str) -> AsyncIOMotorCollection: """ 获取指定名称的集合(collection),类型是 BsonDocument(等价于 Python dict) MongoDB 的集合通常在首次写入数据时自动创建,因此你只需要获取对应集合即可,不需要手动“创建” """ db = get_mongo_client().db return db[entity_model]
这样,我们项目启动的时候,自动加载配置文件,并在这里初始化MongoDBClient的异步对象和集合缓存对象。
有了这些,我们为了方便,还需要对MongoDB数据库的操作进行一些的封装处理,以提高我们对接口的使用遍历,毕竟我们前面介绍到了MongoDB支持非常复杂的查询和处理,我们往往只需要一些特殊的接口即可,因此封装接口有利于我们对接口的使用便利性。
我们在项目的utils目录中增加一个辅助类mongo_helper.py,用来封装MongoDB的相关操作的。
我们截取部分代码,如下所示。
class MongoAsyncHelper: """基于 motor 实现MongoDb异步操作,支持: insert_one / insert_many find_one / find_many update_one / update_many delete_one / delete_many 复杂条件、分页、排序、模糊查询等""" def __init__(self, collection: AsyncIOMotorCollection): self.collection = collection async def insert_one(self, data: Dict[str, Any]): """插入单条数据 examples: helper = MongoAsyncHelper(collection) await helper.insert_one({ "name": "John", "age": 25, "created_at": datetime.utcnow()} ) args: data: 待插入的数据 """ return await self.collection.insert_one(data) async def insert_many(self, data_list: List[Dict[str, Any]]): """插入多条数据 examples: helper = MongoAsyncHelper(collection) await helper.insert_many([ {"name": "John", "age": 25, "created_at": datetime.utcnow()}, {"name": "Mary", "age": 30, "created_at": datetime.utcnow()} ]) args: data_list: 待插入的数据列表 """ return await self.collection.insert_many(data_list) async def find_by_id(self, id: str) -> Optional[Dict[str, Any]]: """根据 _id 查询单条数据""" return await self.find_one({"_id": id}) async def update_by_id(self, id: str, update_data: Dict[str, Any]): """根据 _id 更新单条数据""" return await self.update_one({"_id": id}, {"$set": update_data}) async def delete_by_id(self, id: str): """根据 _id 删除单条数据""" return await self.delete_one({"_id": id}) ...............
例如,我们在前面介绍的EAV处理中,获取MongoDB数据库的指定实体类型(对应表)的所有集合处理,在Python的数据处理层代码中如下所示。
async def mongo_get_all( self, db: AsyncSession, entitytype_id: str, parentid: str = None, sorting: str = None, ) -> List[dict]: """获取实体类型的所有记录 :param db: 数据库会话 :param entitytype_id: 实体类型ID :param parentid: 父ID :param sorting: 排序条件,格式:name asc 或 name asc,age desc """ items = [] try: entityType: EntityType = await self.__get_entity_type(db, entitytype_id) self.logger.info(f"获取实体类型:{entitytype_id}, {entityType.name}") if not entityType: return items # 连接 MongoDB collection = get_collection(entityType.entitymodel) mongo_helper = MongoAsyncHelper(collection) # 构建查询条件 filter = {} if parentid is not None: filter["ParentId"] = parentid sort_by = parse_sort_string(sorting) # print(sort_by) items = await mongo_helper.find_all(filter, sort_by) return items except Exception as e: self.logger.error(f"处理失败:{str(e)}") return items
首先通过缓存接口get_collection获得对应实体类型的集合,然后传递集合到辅助类 MongoAsyncHelper后构建MongoDB辅助类,就可以利用其接口进行相关的MongoDB数据库操作处理了。
最后通过类似的处理,结合数据库操作和MongoDB数据库操作,我们把EAV的接口服务迁移到了Python中FastAPI项目中了。
最后完成的FastAPI项目的EAV相关接口如下所示。
折叠相关模块显示如下所示。
最后切换到Winform项目上,调整接入的Web API数据源,同样获得一样的界面效果即可。
产品数据表。
订单数据表