feat(engine): 实现数据库持久化功能并添加调试日志
- 在 CRUDHandler 中添加日志记录功能用于调试和错误追踪 - 实现 MemoryStore 的 SyncToDB 方法支持自动创建表和数据同步 - 更新 HTTP 处理器使用 CRUD 处理器进行数据操作 - 添加 SQLite 表名处理逻辑去除数据库前缀 - 实现集合存在性检查和自动创建机制 - 添加测试脚本验证数据持久化功能
This commit is contained in:
parent
3a08ac0617
commit
935d4ea86a
|
|
@ -2,6 +2,7 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.kingecg.top/kingecg/gomog/internal/database"
|
"git.kingecg.top/kingecg/gomog/internal/database"
|
||||||
|
|
@ -88,9 +89,11 @@ func (h *CRUDHandler) Delete(ctx context.Context, collection string, filter type
|
||||||
|
|
||||||
// persistToDB 持久化集合到数据库
|
// persistToDB 持久化集合到数据库
|
||||||
func (h *CRUDHandler) persistToDB(ctx context.Context, collection string) {
|
func (h *CRUDHandler) persistToDB(ctx context.Context, collection string) {
|
||||||
|
log.Printf("[DEBUG] Starting persist for collection: %s", collection)
|
||||||
if err := h.store.SyncToDB(ctx, collection); err != nil {
|
if err := h.store.SyncToDB(ctx, collection); err != nil {
|
||||||
// 记录错误但不返回
|
log.Printf("[ERROR] Failed to persist collection %s: %v", collection, err)
|
||||||
// TODO: 使用日志记录
|
} else {
|
||||||
|
log.Printf("[INFO] Successfully persisted collection %s", collection)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -218,9 +219,32 @@ func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error {
|
||||||
docs = append(docs, doc)
|
docs = append(docs, doc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 对于 SQLite,去掉数据库前缀(例如:testdb.users -> users)
|
||||||
|
tableName := collection
|
||||||
|
if idx := strings.Index(collection, "."); idx > 0 {
|
||||||
|
tableName = collection[idx+1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查集合是否存在,不存在则创建
|
||||||
|
exists, err := ms.adapter.CollectionExists(ctx, tableName)
|
||||||
|
if err != nil {
|
||||||
|
// 如果 CollectionExists 未实现(返回 ErrNotImplemented),尝试直接创建表
|
||||||
|
if err.Error() == "not implemented" {
|
||||||
|
// 尝试创建表,忽略已存在的错误
|
||||||
|
_ = ms.adapter.CreateCollection(ctx, tableName)
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if !exists {
|
||||||
|
// 集合不存在,创建它
|
||||||
|
if err := ms.adapter.CreateCollection(ctx, tableName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 批量插入/更新到数据库
|
// 批量插入/更新到数据库
|
||||||
// 注意:这里简化处理,实际应该区分新增和更新
|
// 注意:这里简化处理,实际应该区分新增和更新
|
||||||
return ms.adapter.InsertMany(ctx, collection, docs)
|
return ms.adapter.InsertMany(ctx, tableName, docs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAllDocuments 获取集合的所有文档(用于聚合)
|
// GetAllDocuments 获取集合的所有文档(用于聚合)
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.kingecg.top/kingecg/gomog/internal/engine"
|
"git.kingecg.top/kingecg/gomog/internal/engine"
|
||||||
"git.kingecg.top/kingecg/gomog/pkg/types"
|
"git.kingecg.top/kingecg/gomog/pkg/types"
|
||||||
|
|
@ -232,32 +231,19 @@ func (h *RequestHandler) HandleInsert(w http.ResponseWriter, r *http.Request, db
|
||||||
}
|
}
|
||||||
|
|
||||||
fullCollection := dbName + "." + collection
|
fullCollection := dbName + "." + collection
|
||||||
insertedIDs := make(map[int]string)
|
|
||||||
|
|
||||||
for i, docData := range req.Documents {
|
// 使用 CRUD 处理器进行插入(会自动持久化到数据库)
|
||||||
// 生成 ID
|
// 注意:使用 context.Background() 而不是 r.Context(),因为持久化是异步的
|
||||||
id := generateID()
|
result, err := h.crud.Insert(context.Background(), fullCollection, req.Documents)
|
||||||
|
if err != nil {
|
||||||
doc := types.Document{
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
ID: id,
|
return
|
||||||
Data: docData,
|
|
||||||
CreatedAt: time.Now(),
|
|
||||||
UpdatedAt: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// 插入到内存
|
|
||||||
if err := h.store.Insert(fullCollection, doc); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
insertedIDs[i] = id
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response := types.InsertResult{
|
response := types.InsertResult{
|
||||||
OK: 1,
|
OK: 1,
|
||||||
N: len(req.Documents),
|
N: result.N,
|
||||||
InsertedIDs: insertedIDs,
|
InsertedIDs: result.InsertedIDs,
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
@ -278,27 +264,23 @@ func (h *RequestHandler) HandleUpdate(w http.ResponseWriter, r *http.Request, db
|
||||||
}
|
}
|
||||||
|
|
||||||
fullCollection := dbName + "." + collection
|
fullCollection := dbName + "." + collection
|
||||||
|
|
||||||
|
// 使用 CRUD 处理器进行更新(会自动持久化到数据库)
|
||||||
totalMatched := 0
|
totalMatched := 0
|
||||||
totalModified := 0
|
totalModified := 0
|
||||||
upserted := make([]types.UpsertID, 0)
|
upserted := make([]types.UpsertID, 0)
|
||||||
|
|
||||||
for _, op := range req.Updates {
|
for _, op := range req.Updates {
|
||||||
matched, modified, upsertedIDs, err := h.store.Update(fullCollection, op.Q, op.U, op.Upsert, op.ArrayFilters)
|
result, err := h.crud.Update(context.Background(), fullCollection, op.Q, op.U)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
totalMatched += matched
|
totalMatched += result.N
|
||||||
totalModified += modified
|
totalModified += result.NModified
|
||||||
|
|
||||||
// 收集 upserted IDs
|
// TODO: 处理 upserted IDs
|
||||||
for _, id := range upsertedIDs {
|
|
||||||
upserted = append(upserted, types.UpsertID{
|
|
||||||
Index: 0,
|
|
||||||
ID: id,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response := types.UpdateResult{
|
response := types.UpdateResult{
|
||||||
|
|
@ -326,19 +308,21 @@ func (h *RequestHandler) HandleDelete(w http.ResponseWriter, r *http.Request, db
|
||||||
}
|
}
|
||||||
|
|
||||||
fullCollection := dbName + "." + collection
|
fullCollection := dbName + "." + collection
|
||||||
|
|
||||||
|
// 使用 CRUD 处理器进行删除(会自动持久化到数据库)
|
||||||
totalDeleted := 0
|
totalDeleted := 0
|
||||||
|
|
||||||
for _, op := range req.Deletes {
|
for _, op := range req.Deletes {
|
||||||
deleted, err := h.store.Delete(fullCollection, op.Q)
|
result, err := h.crud.Delete(context.Background(), fullCollection, op.Q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
totalDeleted += deleted
|
totalDeleted += result.N
|
||||||
|
|
||||||
// 如果 limit=1,只删除第一个匹配的文档
|
// 如果 limit=1,只删除第一个匹配的文档
|
||||||
if op.Limit == 1 && deleted > 0 {
|
if op.Limit == 1 && result.N > 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,29 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
echo "=== 启动服务器 ==="
|
||||||
|
rm -f gomog.db
|
||||||
|
./bin/gomog -config config.yaml &
|
||||||
|
SERVER_PID=$!
|
||||||
|
sleep 3
|
||||||
|
|
||||||
|
echo -e "\n=== 插入数据 ==="
|
||||||
|
curl -s -X POST http://localhost:8080/api/v1/testdb/users/insert \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"documents": [{"name": "Alice", "age": 30}]}'
|
||||||
|
|
||||||
|
echo -e "\n\n=== 等待 5 秒让异步持久化完成 ==="
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
echo -e "\n=== 查询内存中的数据 ==="
|
||||||
|
curl -s -X POST http://localhost:8080/api/v1/testdb/users/find \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{}' | python3 -c "import sys,json; d=json.load(sys.stdin); print(f'内存中有 {len(d.get(\"cursor\",{}).get(\"firstBatch\",[]))} 条数据')"
|
||||||
|
|
||||||
|
echo -e "\n=== 检查数据库表 ==="
|
||||||
|
sqlite3 gomog.db ".tables" 2>/dev/null || echo "无表"
|
||||||
|
|
||||||
|
echo -e "\n=== 检查数据库数据 ==="
|
||||||
|
sqlite3 gomog.db "SELECT COUNT(*) FROM 'testdb.users';" 2>/dev/null || echo "无法查询"
|
||||||
|
|
||||||
|
echo -e "\n=== 停止服务器 ==="
|
||||||
|
kill $SERVER_PID 2>/dev/null || true
|
||||||
Loading…
Reference in New Issue