sysDeptModel.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package dept
  2. import (
  3. "context"
  4. "database/sql"
  5. "errors"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "github.com/zeromicro/go-zero/core/stores/cache"
  9. "github.com/zeromicro/go-zero/core/stores/sqlx"
  10. )
  11. var ErrUpdateConflict = errors.New("update conflict: data has been modified by another operation")
  12. var _ SysDeptModel = (*customSysDeptModel)(nil)
  13. type (
  14. SysDeptModel interface {
  15. sysDeptModel
  16. FindAll(ctx context.Context) ([]*SysDept, error)
  17. UpdateWithOptLock(ctx context.Context, data *SysDept, expectedUpdateTime int64) error
  18. // UpdateWithOptLockTx 与 UpdateWithOptLock 语义等价,但 UPDATE 在调用方提供的事务里执行。
  19. // 用于 UpdateDept 需要在同一事务里做"乐观锁更新 sys_dept + 批量递增 sys_user.tokenVersion"
  20. // 的场景(审计 L-R16-2):任一步失败整体回滚,不会出现"部门已改但成员 tokenVersion 未递增"
  21. // 或"tokenVersion 已递增但部门未改"的脏中间态。
  22. //
  23. // 契约(与 UpdateProfileWithTx 的 L-R12-1 对齐):
  24. // - 不在方法内做缓存失效;commit 成功后由调用方显式调用 InvalidateDeptCache(id);
  25. // - session==nil 返回错误;
  26. // - affected=0 → ErrUpdateConflict,交由上层映射 409。
  27. UpdateWithOptLockTx(ctx context.Context, session sqlx.Session, data *SysDept, expectedUpdateTime int64) error
  28. // InvalidateDeptCache 失效 sysDept 的 id 键缓存,仅应在事务 commit 成功后调用。
  29. // best-effort:失效失败只留日志,最终由 TTL 兜底(审计 L-R16-2,与 InvalidateProfileCache
  30. // 相同口径)。
  31. InvalidateDeptCache(ctx context.Context, id int64)
  32. // FindOneForShareTx 在当前事务里对 sys_dept 目标行加 S 锁(SELECT ... LOCK IN SHARE MODE),
  33. // 用于"UpdateUser 改 deptId 到 X"与"DeleteDept 删除 X"之间的 write skew 闭环(审计 M-R11-3)。
  34. // DeleteDept 会先对 sys_dept[X] 取 X 锁——被本 S 锁阻塞;等 UpdateUser 提交后 DeleteDept 再
  35. // 去 FOR SHARE sys_user WHERE deptId=X 时能看到新行,改删除为 400,整链路不产生 orphan deptId。
  36. // 本方法不走缓存,必须在 TransactCtx / Session 下调用。
  37. FindOneForShareTx(ctx context.Context, session sqlx.Session, id int64) (*SysDept, error)
  38. }
  39. customSysDeptModel struct {
  40. *defaultSysDeptModel
  41. }
  42. )
  43. func NewSysDeptModel(conn sqlx.SqlConn, c cache.CacheConf, cachePrefix string, opts ...cache.Option) SysDeptModel {
  44. return &customSysDeptModel{
  45. defaultSysDeptModel: newSysDeptModel(conn, c, cachePrefix, opts...),
  46. }
  47. }
  48. func (m *customSysDeptModel) FindAll(ctx context.Context) ([]*SysDept, error) {
  49. var list []*SysDept
  50. query := fmt.Sprintf("SELECT %s FROM %s ORDER BY `sort` ASC, `id` ASC", sysDeptRows, m.table)
  51. if err := m.QueryRowsNoCacheCtx(ctx, &list, query); err != nil {
  52. return nil, err
  53. }
  54. return list, nil
  55. }
  56. func (m *customSysDeptModel) FindOneForShareTx(ctx context.Context, session sqlx.Session, id int64) (*SysDept, error) {
  57. var data SysDept
  58. query := fmt.Sprintf("SELECT %s FROM %s WHERE `id` = ? LIMIT 1 LOCK IN SHARE MODE", sysDeptRows, m.table)
  59. if err := session.QueryRowCtx(ctx, &data, query, id); err != nil {
  60. return nil, err
  61. }
  62. return &data, nil
  63. }
  64. func (m *customSysDeptModel) UpdateWithOptLock(ctx context.Context, data *SysDept, expectedUpdateTime int64) error {
  65. sysDeptIdKey := fmt.Sprintf("%s%v", cacheSysDeptIdPrefix, data.Id)
  66. res, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (sql.Result, error) {
  67. query := fmt.Sprintf("UPDATE %s SET `name`=?, `sort`=?, `deptType`=?, `remark`=?, `status`=?, `updateTime`=? WHERE `id`=? AND `updateTime`=?", m.table)
  68. return conn.ExecCtx(ctx, query, data.Name, data.Sort, data.DeptType, data.Remark, data.Status, data.UpdateTime, data.Id, expectedUpdateTime)
  69. }, sysDeptIdKey)
  70. if err != nil {
  71. return err
  72. }
  73. affected, _ := res.RowsAffected()
  74. if affected == 0 {
  75. return ErrUpdateConflict
  76. }
  77. return nil
  78. }
  79. // UpdateWithOptLockTx 见接口注释(审计 L-R16-2)。
  80. // 实现上**绕过** m.ExecCtx 的 pre-commit DelCache 语义——仅调用 session.ExecCtx,缓存失效由
  81. // 调用方在事务 commit 成功后显式走 InvalidateDeptCache。
  82. func (m *customSysDeptModel) UpdateWithOptLockTx(ctx context.Context, session sqlx.Session, data *SysDept, expectedUpdateTime int64) error {
  83. if session == nil {
  84. return errors.New("UpdateWithOptLockTx requires a non-nil session")
  85. }
  86. query := fmt.Sprintf("UPDATE %s SET `name`=?, `sort`=?, `deptType`=?, `remark`=?, `status`=?, `updateTime`=? WHERE `id`=? AND `updateTime`=?", m.table)
  87. res, err := session.ExecCtx(ctx, query, data.Name, data.Sort, data.DeptType, data.Remark, data.Status, data.UpdateTime, data.Id, expectedUpdateTime)
  88. if err != nil {
  89. return err
  90. }
  91. affected, _ := res.RowsAffected()
  92. if affected == 0 {
  93. return ErrUpdateConflict
  94. }
  95. return nil
  96. }
  97. // InvalidateDeptCache 见接口注释(审计 L-R16-2)。与 sysUserModel.InvalidateProfileCache 对齐:
  98. // post-commit best-effort 失效,ctx 取消与其它错误分档日志,方便 Redis 抖动与主动取消区分告警。
  99. func (m *customSysDeptModel) InvalidateDeptCache(ctx context.Context, id int64) {
  100. sysDeptIdKey := fmt.Sprintf("%s%v", cacheSysDeptIdPrefix, id)
  101. if err := m.DelCacheCtx(ctx, sysDeptIdKey); err != nil {
  102. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  103. logx.WithContext(ctx).Errorw("cache invalidation skipped: ctx canceled",
  104. logx.Field("audit", "cache_invalidation_skipped_due_to_ctx_cancel"),
  105. logx.Field("scope", "sysDeptModel.InvalidateDeptCache"),
  106. logx.Field("id", id),
  107. logx.Field("err", err.Error()),
  108. )
  109. } else {
  110. logx.WithContext(ctx).Errorf("sysDeptModel.InvalidateDeptCache failed: id=%d err=%v", id, err)
  111. }
  112. }
  113. }