// internal/services/etl_service.go package services import ( "context" "encoding/csv" "fmt" "io" "lost-and-found/internal/models" "os" "strconv" "strings" "sync" "time" "go.uber.org/zap" "gorm.io/gorm" ) // ✅ KRITERIA BASDAT: ETL Implementation (15%) type ETLService struct { db *gorm.DB logger *zap.Logger } func NewETLService(db *gorm.DB, logger *zap.Logger) *ETLService { return &ETLService{ db: db, logger: logger, } } // ETLResult represents ETL operation result type ETLResult struct { TotalRecords int `json:"total_records"` SuccessRecords int `json:"success_records"` FailedRecords int `json:"failed_records"` Duration time.Duration `json:"duration"` Errors []string `json:"errors"` TransformedData map[string]interface{} `json:"transformed_data"` } // ✅ EXTRACT - Extract data from CSV func (s *ETLService) ExtractFromCSV(filepath string) ([]map[string]string, error) { s.logger.Info("Starting EXTRACT phase", zap.String("file", filepath)) file, err := os.Open(filepath) if err != nil { s.logger.Error("Failed to open CSV file", zap.Error(err)) return nil, fmt.Errorf("failed to open file: %w", err) } defer file.Close() reader := csv.NewReader(file) headers, err := reader.Read() if err != nil { return nil, fmt.Errorf("failed to read headers: %w", err) } var records []map[string]string lineNumber := 1 for { record, err := reader.Read() if err == io.EOF { break } if err != nil { s.logger.Warn("Skipping invalid line", zap.Int("line", lineNumber), zap.Error(err)) lineNumber++ continue } data := make(map[string]string) for i, value := range record { if i < len(headers) { data[headers[i]] = strings.TrimSpace(value) } } records = append(records, data) lineNumber++ } s.logger.Info("EXTRACT completed", zap.Int("records", len(records))) return records, nil } // ✅ TRANSFORM - Transform and validate data func (s *ETLService) TransformItemData(records []map[string]string) ([]models.Item, []string) { s.logger.Info("Starting TRANSFORM phase", zap.Int("records", len(records))) var items []models.Item var errors []string // Worker Pool Pattern for concurrent transformation const numWorkers = 5 recordsChan := make(chan map[string]string, len(records)) resultsChan := make(chan struct { item *models.Item error string }, len(records)) var wg sync.WaitGroup // Start workers for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for record := range recordsChan { item, err := s.transformSingleItem(record) if err != nil { resultsChan <- struct { item *models.Item error string }{nil, err.Error()} } else { resultsChan <- struct { item *models.Item error string }{item, ""} } } }(i) } // Send records to workers go func() { for _, record := range records { recordsChan <- record } close(recordsChan) }() // Wait for workers to finish go func() { wg.Wait() close(resultsChan) }() // Collect results for result := range resultsChan { if result.error != "" { errors = append(errors, result.error) } else if result.item != nil { items = append(items, *result.item) } } s.logger.Info("TRANSFORM completed", zap.Int("success", len(items)), zap.Int("failed", len(errors))) return items, errors } func (s *ETLService) transformSingleItem(record map[string]string) (*models.Item, error) { // Validate required fields required := []string{"name", "category_id", "location", "description", "date_found", "reporter_name", "reporter_contact"} for _, field := range required { if record[field] == "" { return nil, fmt.Errorf("missing required field: %s", field) } } // Parse category_id categoryID, err := strconv.ParseUint(record["category_id"], 10, 32) if err != nil { return nil, fmt.Errorf("invalid category_id: %s", record["category_id"]) } // Parse date_found dateFound, err := time.Parse("2006-01-02", record["date_found"]) if err != nil { return nil, fmt.Errorf("invalid date_found format: %s", record["date_found"]) } // Parse reporter_id reporterID, err := strconv.ParseUint(record["reporter_id"], 10, 32) if err != nil { return nil, fmt.Errorf("invalid reporter_id: %s", record["reporter_id"]) } // Create item item := &models.Item{ Name: record["name"], CategoryID: uint(categoryID), PhotoURL: record["photo_url"], Location: record["location"], Description: record["description"], DateFound: dateFound, Status: models.ItemStatusUnclaimed, ReporterID: uint(reporterID), ReporterName: record["reporter_name"], ReporterContact: record["reporter_contact"], } return item, nil } // ✅ LOAD - Load transformed data to database with TRANSACTION func (s *ETLService) LoadItems(items []models.Item) (*ETLResult, error) { s.logger.Info("Starting LOAD phase", zap.Int("items", len(items))) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() startTime := time.Now() result := &ETLResult{ TotalRecords: len(items), Errors: []string{}, } // Batch insert with transaction err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { for _, item := range items { if err := tx.Create(&item).Error; err != nil { result.FailedRecords++ result.Errors = append(result.Errors, fmt.Sprintf("Failed to insert %s: %v", item.Name, err)) s.logger.Warn("Failed to insert item", zap.String("name", item.Name), zap.Error(err)) } else { result.SuccessRecords++ } } // If more than 50% failed, rollback if result.FailedRecords > result.TotalRecords/2 { return fmt.Errorf("too many failures (%d/%d), rolling back transaction", result.FailedRecords, result.TotalRecords) } return nil }) result.Duration = time.Since(startTime) if err != nil { s.logger.Error("LOAD failed", zap.Error(err)) return result, err } s.logger.Info("LOAD completed", zap.Int("success", result.SuccessRecords), zap.Int("failed", result.FailedRecords), zap.Duration("duration", result.Duration)) return result, nil } // ✅ Full ETL Pipeline func (s *ETLService) RunETLPipeline(csvPath string) (*ETLResult, error) { s.logger.Info("Starting FULL ETL Pipeline", zap.String("source", csvPath)) // EXTRACT records, err := s.ExtractFromCSV(csvPath) if err != nil { return nil, fmt.Errorf("extract failed: %w", err) } // TRANSFORM items, transformErrors := s.TransformItemData(records) // LOAD result, err := s.LoadItems(items) if err != nil { return nil, fmt.Errorf("load failed: %w", err) } // Add transform errors to result result.Errors = append(result.Errors, transformErrors...) s.logger.Info("ETL Pipeline completed", zap.Int("total", result.TotalRecords), zap.Int("success", result.SuccessRecords), zap.Int("failed", result.FailedRecords)) return result, nil } // ✅ Export data (Reverse ETL) func (s *ETLService) ExportToCSV(filepath string, query string) error { s.logger.Info("Exporting data to CSV", zap.String("file", filepath)) var items []models.Item if err := s.db.Raw(query).Scan(&items).Error; err != nil { return fmt.Errorf("failed to query data: %w", err) } file, err := os.Create(filepath) if err != nil { return fmt.Errorf("failed to create file: %w", err) } defer file.Close() writer := csv.NewWriter(file) defer writer.Flush() // Write headers headers := []string{"id", "name", "category_id", "location", "description", "date_found", "status"} if err := writer.Write(headers); err != nil { return err } // Write data for _, item := range items { record := []string{ strconv.Itoa(int(item.ID)), item.Name, strconv.Itoa(int(item.CategoryID)), item.Location, item.Description, item.DateFound.Format("2006-01-02"), item.Status, } if err := writer.Write(record); err != nil { return err } } s.logger.Info("Export completed", zap.Int("records", len(items))) return nil } // ✅ Data Synchronization between databases func (s *ETLService) SyncToExternalDB(externalDB *gorm.DB) error { s.logger.Info("Starting database synchronization") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() // Get all items from source var items []models.Item if err := s.db.WithContext(ctx).Find(&items).Error; err != nil { return fmt.Errorf("failed to fetch items: %w", err) } // Sync to external DB with transaction return externalDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { for _, item := range items { // Upsert logic if err := tx.Save(&item).Error; err != nil { s.logger.Warn("Failed to sync item", zap.Uint("id", item.ID), zap.Error(err)) return err } } return nil }) }