feat: sync current progress (P0 hardening + P1 observability + deploy docs/systemd)
This commit is contained in:
171
internal/scheduler/reminder.go
Normal file
171
internal/scheduler/reminder.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"asset-tracker/internal/metrics"
|
||||
"asset-tracker/internal/model"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func StartReminderScan(db *gorm.DB) *cron.Cron {
|
||||
c := cron.New(cron.WithSeconds())
|
||||
|
||||
_, err := c.AddFunc("0 */5 * * * *", func() {
|
||||
runReminderScan(db)
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[scheduler] add reminder scan job failed: %v", err)
|
||||
return c
|
||||
}
|
||||
|
||||
_, err = c.AddFunc("0 10 2 * * *", func() {
|
||||
runCompensationScan(db)
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[scheduler] add compensation job failed: %v", err)
|
||||
return c
|
||||
}
|
||||
|
||||
c.Start()
|
||||
log.Println("[scheduler] reminder scan started: every 5 minutes, compensation daily 02:10")
|
||||
return c
|
||||
}
|
||||
|
||||
func runReminderScan(db *gorm.DB) {
|
||||
now := time.Now().UTC()
|
||||
windowEnd := now.Add(5 * time.Minute)
|
||||
|
||||
var pending []model.Reminder
|
||||
qStart := time.Now()
|
||||
err := db.Where("status = ? AND remind_at <= ?", "pending", windowEnd).Order("status asc, remind_at asc").Limit(200).Find(&pending).Error
|
||||
metrics.ObserveDB("scan_pending", "reminders", err == nil, time.Since(qStart))
|
||||
if err != nil {
|
||||
log.Printf("[scheduler] pending reminder query error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, r := range pending {
|
||||
processReminder(db, r, now)
|
||||
}
|
||||
|
||||
var failed []model.Reminder
|
||||
qStart = time.Now()
|
||||
err = db.Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", "failed", now).Order("status asc, next_retry_at asc").Limit(200).Find(&failed).Error
|
||||
metrics.ObserveDB("scan_failed", "reminders", err == nil, time.Since(qStart))
|
||||
if err != nil {
|
||||
log.Printf("[scheduler] failed reminder query error: %v", err)
|
||||
return
|
||||
}
|
||||
for _, r := range failed {
|
||||
processReminder(db, r, now)
|
||||
}
|
||||
}
|
||||
|
||||
func processReminder(db *gorm.DB, r model.Reminder, now time.Time) {
|
||||
claim := db.Model(&model.Reminder{}).Where("id = ? AND status IN ?", r.ID, []string{"pending", "failed"}).Updates(map[string]interface{}{
|
||||
"status": "sending",
|
||||
"last_error": "",
|
||||
"next_retry_at": nil,
|
||||
})
|
||||
if claim.Error != nil {
|
||||
log.Printf("[scheduler] claim reminder id=%d failed: %v", r.ID, claim.Error)
|
||||
return
|
||||
}
|
||||
if claim.RowsAffected == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if err := deliverReminder(r); err != nil {
|
||||
metrics.ReminderSendTotal.WithLabelValues("failed").Inc()
|
||||
retryCount := r.RetryCount + 1
|
||||
if retryCount >= maxRetryCount() {
|
||||
_ = db.Transaction(func(tx *gorm.DB) error {
|
||||
if err := tx.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{
|
||||
"status": "failed",
|
||||
"retry_count": retryCount,
|
||||
"last_error": "retry limit reached: " + err.Error(),
|
||||
"next_retry_at": nil,
|
||||
}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
dl := model.ReminderDeadLetter{
|
||||
ReminderID: r.ID,
|
||||
UserID: r.UserID,
|
||||
AssetID: r.AssetID,
|
||||
RemindAt: r.RemindAt,
|
||||
Channel: r.Channel,
|
||||
Status: "failed",
|
||||
RetryCount: retryCount,
|
||||
LastError: "retry limit reached: " + err.Error(),
|
||||
}
|
||||
return tx.Where("reminder_id = ?", r.ID).FirstOrCreate(&dl).Error
|
||||
})
|
||||
return
|
||||
}
|
||||
metrics.ReminderRetryTotal.Inc()
|
||||
next := now.Add(retryDelay(retryCount))
|
||||
_ = db.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{
|
||||
"status": "failed",
|
||||
"retry_count": retryCount,
|
||||
"next_retry_at": &next,
|
||||
"last_error": err.Error(),
|
||||
}).Error
|
||||
return
|
||||
}
|
||||
|
||||
metrics.ReminderSendTotal.WithLabelValues("sent").Inc()
|
||||
sentAt := now
|
||||
_ = db.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{
|
||||
"status": "sent",
|
||||
"sent_at": &sentAt,
|
||||
"last_error": "",
|
||||
"next_retry_at": nil,
|
||||
}).Error
|
||||
}
|
||||
|
||||
func runCompensationScan(db *gorm.DB) {
|
||||
now := time.Now().UTC()
|
||||
cutoff := now.Add(-10 * time.Minute)
|
||||
var missed []model.Reminder
|
||||
qStart := time.Now()
|
||||
err := db.Where("status = ? AND remind_at <= ?", "pending", cutoff).Order("remind_at asc").Limit(500).Find(&missed).Error
|
||||
metrics.ObserveDB("compensation_scan", "reminders", err == nil, time.Since(qStart))
|
||||
if err != nil {
|
||||
log.Printf("[scheduler] compensation query error: %v", err)
|
||||
return
|
||||
}
|
||||
if len(missed) > 0 {
|
||||
log.Printf("[scheduler] compensation scan found %d pending overdue reminders", len(missed))
|
||||
}
|
||||
for _, r := range missed {
|
||||
processReminder(db, r, now)
|
||||
}
|
||||
}
|
||||
|
||||
func deliverReminder(r model.Reminder) error {
|
||||
if r.Channel != "in_app" {
|
||||
return errors.New("unsupported channel")
|
||||
}
|
||||
log.Printf("[reminder] user=%d asset=%d channel=%s remind_at=%s dedupe=%s", r.UserID, r.AssetID, r.Channel, r.RemindAt.Format(time.RFC3339), r.DedupeKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
func retryDelay(retryCount int) time.Duration {
|
||||
switch retryCount {
|
||||
case 1:
|
||||
return 5 * time.Minute
|
||||
case 2:
|
||||
return 30 * time.Minute
|
||||
default:
|
||||
return 2 * time.Hour
|
||||
}
|
||||
}
|
||||
|
||||
func maxRetryCount() int {
|
||||
return 8
|
||||
}
|
||||
Reference in New Issue
Block a user