数据库API
数据库API模块提供通用的数据库操作功能,支持查询、创建、更新和删除记录,采用Peewee ORM模型。
导入方式
python
from src.plugin_system.apis import database_api
# 或者
from src.plugin_system import database_api
主要功能
1. 通用数据库操作
python
async def db_query(
model_class: Type[Model],
data: Optional[Dict[str, Any]] = None,
query_type: Optional[str] = "get",
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None,
order_by: Optional[List[str]] = None,
single_result: Optional[bool] = False,
) -> Union[List[Dict[str, Any]], Dict[str, Any], None]:
执行数据库查询操作的通用接口。
Args:
model_class
: Peewee模型类。- Peewee模型类可以在
src.common.database.database_model
模块中找到,如ActionRecords
、Messages
等。
- Peewee模型类可以在
data
: 用于创建或更新的数据query_type
: 查询类型- 可选值:
get
,create
,update
,delete
,count
。
- 可选值:
filters
: 过滤条件字典,键为字段名,值为要匹配的值。limit
: 限制结果数量。order_by
: 排序字段列表,使用字段名,前缀'-'表示降序。- 排序字段,前缀
-
表示降序,例如-time
表示按时间字段(即time
字段)降序
- 排序字段,前缀
single_result
: 是否只返回单个结果。
Returns:
- 根据查询类型返回不同的结果:
get
: 返回查询结果列表或单个结果。(如果single_result=True
)create
: 返回创建的记录。update
: 返回受影响的行数。delete
: 返回受影响的行数。count
: 返回记录数量。
示例
- 查询最近10条消息
python
messages = await database_api.db_query(
Messages,
query_type="get",
filters={"chat_id": chat_stream.stream_id},
limit=10,
order_by=["-time"]
)
- 创建一条记录
python
new_record = await database_api.db_query(
ActionRecords,
data={"action_id": "123", "time": time.time(), "action_name": "TestAction"},
query_type="create",
)
- 更新记录
python
updated_count = await database_api.db_query(
ActionRecords,
data={"action_done": True},
query_type="update",
filters={"action_id": "123"},
)
- 删除记录
python
deleted_count = await database_api.db_query(
ActionRecords,
query_type="delete",
filters={"action_id": "123"}
)
- 计数
python
count = await database_api.db_query(
Messages,
query_type="count",
filters={"chat_id": chat_stream.stream_id}
)
2. 数据库保存
python
async def db_save(
model_class: Type[Model], data: Dict[str, Any], key_field: Optional[str] = None, key_value: Optional[Any] = None
) -> Optional[Dict[str, Any]]:
保存数据到数据库(创建或更新)
如果提供了key_field和key_value,会先尝试查找匹配的记录进行更新;
如果没有找到匹配记录,或未提供key_field和key_value,则创建新记录。
Args:
model_class
: Peewee模型类。data
: 要保存的数据字典。key_field
: 用于查找现有记录的字段名,例如"action_id"。key_value
: 用于查找现有记录的字段值。
Returns:
Optional[Dict[str, Any]]
: 保存后的记录数据,失败时返回None。
示例
创建或更新一条记录
python
record = await database_api.db_save(
ActionRecords,
{
"action_id": "123",
"time": time.time(),
"action_name": "TestAction",
"action_done": True
},
key_field="action_id",
key_value="123"
)
3. 数据库获取
python
async def db_get(
model_class: Type[Model],
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None,
order_by: Optional[str] = None,
single_result: Optional[bool] = False,
) -> Union[List[Dict[str, Any]], Dict[str, Any], None]:
从数据库获取记录
这是db_query方法的简化版本,专注于数据检索操作。
Args:
model_class
: Peewee模型类。filters
: 过滤条件字典,键为字段名,值为要匹配的值。limit
: 限制结果数量。order_by
: 排序字段,使用字段名,前缀'-'表示降序。single_result
: 是否只返回单个结果,如果为True,则返回单个记录字典或None;否则返回记录字典列表或空列表
Returns:
Union[List[Dict], Dict, None]
: 查询结果列表或单个结果(如果single_result=True
),失败时返回None。
示例
- 获取单个记录
python
record = await database_api.db_get(
ActionRecords,
filters={"action_id": "123"},
limit=1
)
- 获取最近10条记录
python
records = await database_api.db_get(
Messages,
filters={"chat_id": chat_stream.stream_id},
limit=10,
order_by="-time",
)
4. 动作信息存储
python
async def store_action_info(
chat_stream=None,
action_build_into_prompt: bool = False,
action_prompt_display: str = "",
action_done: bool = True,
thinking_id: str = "",
action_data: Optional[dict] = None,
action_name: str = "",
) -> Optional[Dict[str, Any]]:
存储动作信息到数据库,是一种针对 Action 的 db_save()
的封装函数。
将Action执行的相关信息保存到ActionRecords表中,用于后续的记忆和上下文构建。
Args:
chat_stream
: 聊天流对象,包含聊天ID等信息。action_build_into_prompt
: 是否将动作信息构建到提示中。action_prompt_display
: 动作提示的显示文本。action_done
: 动作是否完成。thinking_id
: 思考过程的ID。action_data
: 动作的数据字典。action_name
: 动作的名称。
Returns:
Optional[Dict[str, Any]]
: 存储后的记录数据,失败时返回None。
示例
python
record = await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=True,
action_prompt_display="执行了回复动作",
action_done=True,
thinking_id="thinking_123",
action_data={"content": "Hello"},
action_name="reply_action"
)