Basdat/internal/workers/expire_worker.go
2025-12-20 00:01:08 +07:00

223 lines
5.8 KiB
Go

// internal/workers/expire_worker.go - IMPROVED VERSION
package workers
import (
"context"
"log"
"lost-and-found/internal/models"
"lost-and-found/internal/repositories"
"sync"
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type ExpireWorker struct {
db *gorm.DB
itemRepo *repositories.ItemRepository
archiveRepo *repositories.ArchiveRepository
stopChan chan bool
wg sync.WaitGroup
mu sync.Mutex
// ✅ Worker pool configuration
maxWorkers int
taskQueue chan *models.Item
}
func NewExpireWorker(db *gorm.DB) *ExpireWorker {
return &ExpireWorker{
db: db,
itemRepo: repositories.NewItemRepository(db),
archiveRepo: repositories.NewArchiveRepository(db),
stopChan: make(chan bool),
maxWorkers: 5, // ✅ Configurable worker count
taskQueue: make(chan *models.Item, 100), // ✅ Buffered channel
}
}
// ✅ Start with proper worker pool
func (w *ExpireWorker) Start() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
log.Println("⏰ Expire Worker started with", w.maxWorkers, "workers")
// ✅ Start worker pool
w.startWorkerPool()
// Run immediately on start
w.expireItems()
// Then run every hour
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
w.expireItems()
case <-w.stopChan:
log.Println("⏰ Stopping Expire Worker...")
close(w.taskQueue) // ✅ Close task queue to signal workers
return
}
}
}()
}
// ✅ Worker Pool Implementation
func (w *ExpireWorker) startWorkerPool() {
for i := 0; i < w.maxWorkers; i++ {
w.wg.Add(1)
go w.worker(i)
}
}
// ✅ Individual Worker
func (w *ExpireWorker) worker(id int) {
defer w.wg.Done()
log.Printf("🔧 Worker %d started", id)
for item := range w.taskQueue {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
success := w.archiveExpiredItem(ctx, item)
if success {
log.Printf("✅ Worker %d: Archived item ID %d (%s)", id, item.ID, item.Name)
} else {
log.Printf("❌ Worker %d: Failed to archive item ID %d", id, item.ID)
}
cancel()
}
log.Printf("🔧 Worker %d stopped", id)
}
// ✅ Improved expireItems
func (w *ExpireWorker) expireItems() {
log.Println("🔍 Checking for expired items (Using Stored Procedure)...")
// Panggil method repository yang mengeksekusi SP
// Pastikan Anda sudah menambahkan method CallArchiveExpiredProcedure di ItemRepository
// seperti pada langkah 1 jawaban sebelumnya.
count, err := w.itemRepo.CallArchiveExpiredProcedure()
if err != nil {
log.Printf("❌ Error executing archive SP: %v", err)
return
}
if count > 0 {
log.Printf("✅ Successfully archived %d items via DB Procedure", count)
} else {
log.Println("✅ No expired items found to archive")
}
}
func (w *ExpireWorker) findExpiredItems(ctx context.Context) ([]models.Item, error) {
var items []models.Item
err := w.db.WithContext(ctx).
Where("expires_at <= ? AND status = ? AND deleted_at IS NULL",
time.Now(), models.ItemStatusUnclaimed).
Preload("Category").
Find(&items).Error
return items, err
}
// ✅ Archive with proper transaction & locking
func (w *ExpireWorker) archiveExpiredItem(ctx context.Context, item *models.Item) bool {
err := w.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// ✅ Lock the item
var lockedItem models.Item
// Locking/Isolation: Menerapkan Pessimistic Lock (FOR UPDATE) pada baris item yang akan diproses, mencegah worker lain atau API call lain memodifikasi item ini selama transaksi berlangsung, menjaga Konsistensi dan Pemulihan.
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("id = ? AND deleted_at IS NULL", item.ID).
First(&lockedItem).Error; err != nil {
return err
}
// Double check it's still unclaimed
if lockedItem.Status != models.ItemStatusUnclaimed {
return nil // Already processed
}
// 1. Create archive record
archive := &models.Archive{
ItemID: item.ID,
Name: item.Name,
CategoryID: item.CategoryID,
PhotoURL: item.PhotoURL,
Location: item.Location,
Description: item.Description,
DateFound: item.DateFound,
Status: models.ItemStatusExpired,
ReporterName: item.ReporterName,
ReporterContact: item.ReporterContact,
ArchivedReason: models.ArchiveReasonExpired,
ClaimedBy: nil,
ArchivedAt: time.Now(),
}
if err := tx.Create(archive).Error; err != nil {
return err
}
// 2. Update item status to expired
if err := tx.Model(&lockedItem).Update("status", models.ItemStatusExpired).Error; err != nil {
return err
}
// 3. Create audit log
auditLog := &models.AuditLog{
UserID: nil,
Action: "expire",
EntityType: models.EntityItem,
EntityID: &item.ID,
Details: "Item automatically expired and archived by system",
IPAddress: "system",
UserAgent: "expire_worker",
}
if err := tx.Create(auditLog).Error; err != nil {
log.Printf("Warning: failed to create audit log for item %d: %v", item.ID, err)
}
return nil
})
return err == nil
}
// ✅ Graceful Stop with WaitGroup
func (w *ExpireWorker) Stop() {
log.Println("🛑 Signaling Expire Worker to stop...")
w.stopChan <- true
log.Println("⏳ Waiting for all workers to finish...")
w.wg.Wait()
log.Println("✅ Expire Worker gracefully stopped")
}
// RunNow for manual trigger
func (w *ExpireWorker) RunNow() {
log.Println("▶️ Running expiration check manually...")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
w.expireItems()
}()
wg.Wait()
log.Println("✅ Manual expiration check completed")
}