package dm8 import ( "context" "database/sql" "encoding/json" "fmt" "time" "git.kingecg.top/kingecg/gomog/internal/database" "git.kingecg.top/kingecg/gomog/pkg/types" // 达梦数据库驱动,可能需要使用官方驱动或 ODBC // _ "github.com/alexbrainman/odbc" ) // DM8Adapter 达梦 DM8 数据库适配器 type DM8Adapter struct { *database.BaseAdapter } // NewDM8Adapter 创建达梦 DM8 适配器 func NewDM8Adapter() *DM8Adapter { return &DM8Adapter{ BaseAdapter: database.NewBaseAdapter("dm"), } } // Connect 连接达梦数据库 func (a *DM8Adapter) Connect(ctx context.Context, dsn string) error { if err := a.BaseAdapter.Connect(ctx, dsn); err != nil { return err } // 设置达梦数据库会话参数 _, err := a.GetDB().Exec("SET SESSION TIME_ZONE='UTC'") return err } // CreateCollection 创建集合(达梦表) func (a *DM8Adapter) CreateCollection(ctx context.Context, name string) error { // 达梦数据库支持 JSON 类型 query := fmt.Sprintf(` CREATE TABLE %s ( id VARCHAR(256) PRIMARY KEY, data JSON NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )`, name) _, err := a.GetDB().ExecContext(ctx, query) return err } // CollectionExists 检查集合是否存在 func (a *DM8Adapter) CollectionExists(ctx context.Context, name string) (bool, error) { query := `SELECT COUNT(*) FROM USER_TABLES WHERE TABLE_NAME = ?` var count int err := a.GetDB().QueryRowContext(ctx, query, name).Scan(&count) if err != nil { return false, err } return count > 0, nil } // FindAll 查询所有文档(使用达梦 JSON 函数) func (a *DM8Adapter) FindAll(ctx context.Context, collection string) ([]types.Document, error) { query := fmt.Sprintf("SELECT id, TO_CHAR(data), created_at, updated_at FROM %s", collection) rows, err := a.GetDB().QueryContext(ctx, query) if err != nil { return nil, err } defer rows.Close() var docs []types.Document for rows.Next() { var doc types.Document var jsonData string err := rows.Scan(&doc.ID, &jsonData, &doc.CreatedAt, &doc.UpdatedAt) if err != nil { return nil, err } if err := json.Unmarshal([]byte(jsonData), &doc.Data); err != nil { return nil, err } docs = append(docs, doc) } return docs, nil } // InsertMany 批量插入(达梦优化版本) func (a *DM8Adapter) InsertMany(ctx context.Context, collection string, docs []types.Document) error { tx, err := a.GetDB().BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() for _, doc := range docs { jsonData, err := json.Marshal(doc.Data) if err != nil { return err } query := fmt.Sprintf( "INSERT INTO %s (id, data, created_at, updated_at) VALUES (?, ?, ?, ?)", collection, ) now := doc.CreatedAt if now.IsZero() { now = doc.UpdatedAt } if now.IsZero() { now = time.Now() } _, err = tx.ExecContext(ctx, query, doc.ID, string(jsonData), now, now) if err != nil { return err } } return tx.Commit() } // UpdateMany 批量更新(达梦版本) func (a *DM8Adapter) UpdateMany(ctx context.Context, collection string, ids []string, update types.Update) error { if len(ids) == 0 { return nil } tx, err := a.GetDB().BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // 达梦数据库的 JSON 更新需要重新构建整个 JSON 文档 // 这里简化处理:先查询原数据,合并后再更新 for _, id := range ids { // 查询原文档 var currentData map[string]interface{} var jsonData string query := fmt.Sprintf("SELECT TO_CHAR(data) FROM %s WHERE id = ?", collection) err = tx.QueryRowContext(ctx, query, id).Scan(&jsonData) if err != nil { if err == sql.ErrNoRows { continue } return err } if err := json.Unmarshal([]byte(jsonData), ¤tData); err != nil { return err } // 应用更新 for k, v := range update.Set { currentData[k] = v } for k := range update.Unset { delete(currentData, k) } // 更新回数据库 newJSON, _ := json.Marshal(currentData) updateQuery := fmt.Sprintf( "UPDATE %s SET data = ?, updated_at = ? WHERE id = ?", collection, ) _, err = tx.ExecContext(ctx, updateQuery, string(newJSON), time.Now(), id) if err != nil { return err } } return tx.Commit() } // ListCollections 获取所有集合(表)列表 func (a *DM8Adapter) ListCollections(ctx context.Context) ([]string, error) { query := `SELECT TABLE_NAME FROM USER_TABLES ORDER BY TABLE_NAME` rows, err := a.GetDB().QueryContext(ctx, query) if err != nil { return nil, err } defer rows.Close() var tables []string for rows.Next() { var table string if err := rows.Scan(&table); err != nil { return nil, err } tables = append(tables, table) } return tables, rows.Err() }