// internal/workers/expire_worker.go - IMPROVED VERSION package workers import ( "context" "log" "lost-and-found/internal/models" "lost-and-found/internal/repositories" "sync" "time" "gorm.io/gorm" "gorm.io/gorm/clause" ) type ExpireWorker struct { db *gorm.DB itemRepo *repositories.ItemRepository archiveRepo *repositories.ArchiveRepository stopChan chan bool wg sync.WaitGroup mu sync.Mutex // ✅ Worker pool configuration maxWorkers int taskQueue chan *models.Item } func NewExpireWorker(db *gorm.DB) *ExpireWorker { return &ExpireWorker{ db: db, itemRepo: repositories.NewItemRepository(db), archiveRepo: repositories.NewArchiveRepository(db), stopChan: make(chan bool), maxWorkers: 5, // ✅ Configurable worker count taskQueue: make(chan *models.Item, 100), // ✅ Buffered channel } } // ✅ Start with proper worker pool func (w *ExpireWorker) Start() { w.wg.Add(1) go func() { defer w.wg.Done() log.Println("⏰ Expire Worker started with", w.maxWorkers, "workers") // ✅ Start worker pool w.startWorkerPool() // Run immediately on start w.expireItems() // Then run every hour ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() for { select { case <-ticker.C: w.expireItems() case <-w.stopChan: log.Println("⏰ Stopping Expire Worker...") close(w.taskQueue) // ✅ Close task queue to signal workers return } } }() } // ✅ Worker Pool Implementation func (w *ExpireWorker) startWorkerPool() { for i := 0; i < w.maxWorkers; i++ { w.wg.Add(1) go w.worker(i) } } // ✅ Individual Worker func (w *ExpireWorker) worker(id int) { defer w.wg.Done() log.Printf("🔧 Worker %d started", id) for item := range w.taskQueue { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) success := w.archiveExpiredItem(ctx, item) if success { log.Printf("✅ Worker %d: Archived item ID %d (%s)", id, item.ID, item.Name) } else { log.Printf("❌ Worker %d: Failed to archive item ID %d", id, item.ID) } cancel() } log.Printf("🔧 Worker %d stopped", id) } // ✅ Improved expireItems func (w *ExpireWorker) expireItems() { log.Println("🔍 Checking for expired items (Using Stored Procedure)...") // Panggil method repository yang mengeksekusi SP // Pastikan Anda sudah menambahkan method CallArchiveExpiredProcedure di ItemRepository // seperti pada langkah 1 jawaban sebelumnya. count, err := w.itemRepo.CallArchiveExpiredProcedure() if err != nil { log.Printf("❌ Error executing archive SP: %v", err) return } if count > 0 { log.Printf("✅ Successfully archived %d items via DB Procedure", count) } else { log.Println("✅ No expired items found to archive") } } func (w *ExpireWorker) findExpiredItems(ctx context.Context) ([]models.Item, error) { var items []models.Item err := w.db.WithContext(ctx). Where("expires_at <= ? AND status = ? AND deleted_at IS NULL", time.Now(), models.ItemStatusUnclaimed). Preload("Category"). Find(&items).Error return items, err } // ✅ Archive with proper transaction & locking func (w *ExpireWorker) archiveExpiredItem(ctx context.Context, item *models.Item) bool { err := w.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // ✅ Lock the item var lockedItem models.Item // Locking/Isolation: Menerapkan Pessimistic Lock (FOR UPDATE) pada baris item yang akan diproses, mencegah worker lain atau API call lain memodifikasi item ini selama transaksi berlangsung, menjaga Konsistensi dan Pemulihan. if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). Where("id = ? AND deleted_at IS NULL", item.ID). First(&lockedItem).Error; err != nil { return err } // Double check it's still unclaimed if lockedItem.Status != models.ItemStatusUnclaimed { return nil // Already processed } // 1. Create archive record archive := &models.Archive{ ItemID: item.ID, Name: item.Name, CategoryID: item.CategoryID, PhotoURL: item.PhotoURL, Location: item.Location, Description: item.Description, DateFound: item.DateFound, Status: models.ItemStatusExpired, ReporterName: item.ReporterName, ReporterContact: item.ReporterContact, ArchivedReason: models.ArchiveReasonExpired, ClaimedBy: nil, ArchivedAt: time.Now(), } if err := tx.Create(archive).Error; err != nil { return err } // 2. Update item status to expired if err := tx.Model(&lockedItem).Update("status", models.ItemStatusExpired).Error; err != nil { return err } // 3. Create audit log auditLog := &models.AuditLog{ UserID: nil, Action: "expire", EntityType: models.EntityItem, EntityID: &item.ID, Details: "Item automatically expired and archived by system", IPAddress: "system", UserAgent: "expire_worker", } if err := tx.Create(auditLog).Error; err != nil { log.Printf("Warning: failed to create audit log for item %d: %v", item.ID, err) } return nil }) return err == nil } // ✅ Graceful Stop with WaitGroup func (w *ExpireWorker) Stop() { log.Println("🛑 Signaling Expire Worker to stop...") w.stopChan <- true log.Println("⏳ Waiting for all workers to finish...") w.wg.Wait() log.Println("✅ Expire Worker gracefully stopped") } // RunNow for manual trigger func (w *ExpireWorker) RunNow() { log.Println("▶️ Running expiration check manually...") var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() w.expireItems() }() wg.Wait() log.Println("✅ Manual expiration check completed") }