lockByCodeTx_audit_test.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package product
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "testing"
  9. "time"
  10. "perms-system-server/internal/testutil"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. "github.com/zeromicro/go-zero/core/stores/sqlx"
  14. )
  15. // ---------------------------------------------------------------------------
  16. // 覆盖目标:审计 M-6 的新增 LockByCodeTx —— "在当前事务里 SELECT ... FOR UPDATE 锁住 product 行"。
  17. // 本方法是后续将 SyncPermissions 串行化到每个 product 的基础设施。契约:
  18. // 1) 存在的 code 必须正常返回全字段;
  19. // 2) 不存在的 code 必须返回 sqlx.ErrNotFound 以便调用方 fail-close;
  20. // 3) 行锁必须真实 —— 两个事务对同一 code 并发 FOR UPDATE,
  21. // 先拿到锁的那个必须让后者阻塞到前者 commit/rollback 为止。
  22. // ---------------------------------------------------------------------------
  23. // TC-0809: LockByCodeTx 对存在的 code 返回完整数据。
  24. func TestSysProductModel_LockByCodeTx_Found(t *testing.T) {
  25. ctx := context.Background()
  26. conn := testutil.GetTestSqlConn()
  27. m := newTestModel(t)
  28. p := newSysProduct()
  29. res, err := m.Insert(ctx, p)
  30. require.NoError(t, err)
  31. id, _ := res.LastInsertId()
  32. t.Cleanup(func() { testutil.CleanTable(ctx, conn, "`sys_product`", id) })
  33. var got *SysProduct
  34. require.NoError(t, m.TransactCtx(ctx, func(c context.Context, session sqlx.Session) error {
  35. got, err = m.LockByCodeTx(c, session, p.Code)
  36. return err
  37. }))
  38. require.NotNil(t, got)
  39. assert.Equal(t, id, got.Id)
  40. assert.Equal(t, p.Code, got.Code)
  41. assert.Equal(t, p.AppKey, got.AppKey)
  42. assert.Equal(t, p.Status, got.Status, "锁行时不得过滤禁用态,否则 SyncPermissions 无法为禁用产品正确 fail-close")
  43. }
  44. // TC-0810: LockByCodeTx 对不存在的 code 返回 sqlx.ErrNotFound。
  45. func TestSysProductModel_LockByCodeTx_NotFound(t *testing.T) {
  46. ctx := context.Background()
  47. m := newTestModel(t)
  48. err := m.TransactCtx(ctx, func(c context.Context, session sqlx.Session) error {
  49. _, e := m.LockByCodeTx(c, session, "definitely_no_such_code_"+testutil.UniqueId())
  50. return e
  51. })
  52. require.Error(t, err)
  53. assert.True(t, errors.Is(err, sqlx.ErrNotFound),
  54. "LockByCodeTx 对不存在的 code 必须返回 ErrNotFound,便于上层 fail-close 返回 401/404")
  55. }
  56. // TC-0811: FOR UPDATE 行锁真实生效 —— 两个事务同时尝试锁同一行时,
  57. // 后进者必须被阻塞直到先进者结束事务。
  58. // 测量方式:goroutine A 在 tx 内 Lock 住后 sleep 500ms 再 commit;
  59. // goroutine B 等 100ms 后也尝试 Lock 同一行,记录耗时。B 的耗时必须≥400ms。
  60. func TestSysProductModel_LockByCodeTx_BlocksConcurrentWriter(t *testing.T) {
  61. ctx := context.Background()
  62. conn := testutil.GetTestSqlConn()
  63. m := newTestModel(t)
  64. p := newSysProduct()
  65. res, err := m.Insert(ctx, p)
  66. require.NoError(t, err)
  67. id, _ := res.LastInsertId()
  68. t.Cleanup(func() { testutil.CleanTable(ctx, conn, "`sys_product`", id) })
  69. var (
  70. wg sync.WaitGroup
  71. aHoldMs = int64(500)
  72. bStartDelayMs = int64(100)
  73. bElapsedNanos int64
  74. aFinishedNanos int64
  75. aErr, bErr error
  76. )
  77. wg.Add(2)
  78. go func() {
  79. defer wg.Done()
  80. aErr = m.TransactCtx(ctx, func(c context.Context, session sqlx.Session) error {
  81. if _, e := m.LockByCodeTx(c, session, p.Code); e != nil {
  82. return e
  83. }
  84. // A 拿到锁后故意延时,模拟一段业务处理期。期间 B 必须被阻塞。
  85. time.Sleep(time.Duration(aHoldMs) * time.Millisecond)
  86. atomic.StoreInt64(&aFinishedNanos, time.Now().UnixNano())
  87. return nil
  88. })
  89. }()
  90. go func() {
  91. defer wg.Done()
  92. time.Sleep(time.Duration(bStartDelayMs) * time.Millisecond)
  93. start := time.Now()
  94. bErr = m.TransactCtx(ctx, func(c context.Context, session sqlx.Session) error {
  95. _, e := m.LockByCodeTx(c, session, p.Code)
  96. return e
  97. })
  98. atomic.StoreInt64(&bElapsedNanos, time.Since(start).Nanoseconds())
  99. }()
  100. wg.Wait()
  101. require.NoError(t, aErr)
  102. require.NoError(t, bErr)
  103. // B 的耗时 ≥ A 的剩余持锁时间。A 持 500ms,B 延 100ms 后入场,
  104. // 因此 B 被阻塞的时间至少 (500-100)=400ms。给 DB 一点抖动放到 300ms。
  105. elapsedMs := atomic.LoadInt64(&bElapsedNanos) / int64(time.Millisecond)
  106. minBlockedMs := int64(300)
  107. assert.GreaterOrEqualf(t, elapsedMs, minBlockedMs, fmt.Sprintf(
  108. "B 的 LockByCodeTx 总耗时 %dms 明显低于预期最小阻塞 %dms —— "+
  109. "意味着 FOR UPDATE 行锁失效,M-6 声称的'按 product 串行化'不成立",
  110. elapsedMs, minBlockedMs))
  111. }