package scheduler import ( "errors" "log" "time" "asset-tracker/internal/metrics" "asset-tracker/internal/model" "github.com/robfig/cron/v3" "gorm.io/gorm" ) func StartReminderScan(db *gorm.DB) *cron.Cron { c := cron.New(cron.WithSeconds()) _, err := c.AddFunc("0 */5 * * * *", func() { runReminderScan(db) }) if err != nil { log.Printf("[scheduler] add reminder scan job failed: %v", err) return c } _, err = c.AddFunc("0 10 2 * * *", func() { runCompensationScan(db) }) if err != nil { log.Printf("[scheduler] add compensation job failed: %v", err) return c } c.Start() log.Println("[scheduler] reminder scan started: every 5 minutes, compensation daily 02:10") return c } func runReminderScan(db *gorm.DB) { now := time.Now().UTC() windowEnd := now.Add(5 * time.Minute) var pending []model.Reminder qStart := time.Now() err := db.Where("status = ? AND remind_at <= ?", "pending", windowEnd).Order("status asc, remind_at asc").Limit(200).Find(&pending).Error metrics.ObserveDB("scan_pending", "reminders", err == nil, time.Since(qStart)) if err != nil { log.Printf("[scheduler] pending reminder query error: %v", err) return } for _, r := range pending { processReminder(db, r, now) } var failed []model.Reminder qStart = time.Now() err = db.Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", "failed", now).Order("status asc, next_retry_at asc").Limit(200).Find(&failed).Error metrics.ObserveDB("scan_failed", "reminders", err == nil, time.Since(qStart)) if err != nil { log.Printf("[scheduler] failed reminder query error: %v", err) return } for _, r := range failed { processReminder(db, r, now) } } func processReminder(db *gorm.DB, r model.Reminder, now time.Time) { claim := db.Model(&model.Reminder{}).Where("id = ? AND status IN ?", r.ID, []string{"pending", "failed"}).Updates(map[string]interface{}{ "status": "sending", "last_error": "", "next_retry_at": nil, }) if claim.Error != nil { log.Printf("[scheduler] claim reminder id=%d failed: %v", r.ID, claim.Error) return } if claim.RowsAffected == 0 { return } if err := deliverReminder(r); err != nil { metrics.ReminderSendTotal.WithLabelValues("failed").Inc() retryCount := r.RetryCount + 1 if retryCount >= maxRetryCount() { _ = db.Transaction(func(tx *gorm.DB) error { if err := tx.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{ "status": "failed", "retry_count": retryCount, "last_error": "retry limit reached: " + err.Error(), "next_retry_at": nil, }).Error; err != nil { return err } dl := model.ReminderDeadLetter{ ReminderID: r.ID, UserID: r.UserID, AssetID: r.AssetID, RemindAt: r.RemindAt, Channel: r.Channel, Status: "failed", RetryCount: retryCount, LastError: "retry limit reached: " + err.Error(), } return tx.Where("reminder_id = ?", r.ID).FirstOrCreate(&dl).Error }) return } metrics.ReminderRetryTotal.Inc() next := now.Add(retryDelay(retryCount)) _ = db.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{ "status": "failed", "retry_count": retryCount, "next_retry_at": &next, "last_error": err.Error(), }).Error return } metrics.ReminderSendTotal.WithLabelValues("sent").Inc() sentAt := now _ = db.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{ "status": "sent", "sent_at": &sentAt, "last_error": "", "next_retry_at": nil, }).Error } func runCompensationScan(db *gorm.DB) { now := time.Now().UTC() cutoff := now.Add(-10 * time.Minute) var missed []model.Reminder qStart := time.Now() err := db.Where("status = ? AND remind_at <= ?", "pending", cutoff).Order("remind_at asc").Limit(500).Find(&missed).Error metrics.ObserveDB("compensation_scan", "reminders", err == nil, time.Since(qStart)) if err != nil { log.Printf("[scheduler] compensation query error: %v", err) return } if len(missed) > 0 { log.Printf("[scheduler] compensation scan found %d pending overdue reminders", len(missed)) } for _, r := range missed { processReminder(db, r, now) } } func deliverReminder(r model.Reminder) error { if r.Channel != "in_app" { return errors.New("unsupported channel") } log.Printf("[reminder] user=%d asset=%d channel=%s remind_at=%s dedupe=%s", r.UserID, r.AssetID, r.Channel, r.RemindAt.Format(time.RFC3339), r.DedupeKey) return nil } func retryDelay(retryCount int) time.Duration { switch retryCount { case 1: return 5 * time.Minute case 2: return 30 * time.Minute default: return 2 * time.Hour } } func maxRetryCount() int { return 8 }