Basdat/internal/services/etl_service.go
2025-12-20 00:01:08 +07:00

352 lines
9.1 KiB
Go

// internal/services/etl_service.go
package services
import (
"context"
"encoding/csv"
"fmt"
"io"
"lost-and-found/internal/models"
"os"
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
"gorm.io/gorm"
)
// ✅ KRITERIA BASDAT: ETL Implementation (15%)
type ETLService struct {
db *gorm.DB
logger *zap.Logger
}
func NewETLService(db *gorm.DB, logger *zap.Logger) *ETLService {
return &ETLService{
db: db,
logger: logger,
}
}
// ETLResult represents ETL operation result
type ETLResult struct {
TotalRecords int `json:"total_records"`
SuccessRecords int `json:"success_records"`
FailedRecords int `json:"failed_records"`
Duration time.Duration `json:"duration"`
Errors []string `json:"errors"`
TransformedData map[string]interface{} `json:"transformed_data"`
}
// ✅ EXTRACT - Extract data from CSV
func (s *ETLService) ExtractFromCSV(filepath string) ([]map[string]string, error) {
s.logger.Info("Starting EXTRACT phase", zap.String("file", filepath))
file, err := os.Open(filepath)
if err != nil {
s.logger.Error("Failed to open CSV file", zap.Error(err))
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
reader := csv.NewReader(file)
headers, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("failed to read headers: %w", err)
}
var records []map[string]string
lineNumber := 1
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
s.logger.Warn("Skipping invalid line", zap.Int("line", lineNumber), zap.Error(err))
lineNumber++
continue
}
data := make(map[string]string)
for i, value := range record {
if i < len(headers) {
data[headers[i]] = strings.TrimSpace(value)
}
}
records = append(records, data)
lineNumber++
}
s.logger.Info("EXTRACT completed", zap.Int("records", len(records)))
return records, nil
}
// ✅ TRANSFORM - Transform and validate data
func (s *ETLService) TransformItemData(records []map[string]string) ([]models.Item, []string) {
s.logger.Info("Starting TRANSFORM phase", zap.Int("records", len(records)))
var items []models.Item
var errors []string
// Worker Pool Pattern for concurrent transformation
const numWorkers = 5
recordsChan := make(chan map[string]string, len(records))
resultsChan := make(chan struct {
item *models.Item
error string
}, len(records))
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for record := range recordsChan {
item, err := s.transformSingleItem(record)
if err != nil {
resultsChan <- struct {
item *models.Item
error string
}{nil, err.Error()}
} else {
resultsChan <- struct {
item *models.Item
error string
}{item, ""}
}
}
}(i)
}
// Send records to workers
go func() {
for _, record := range records {
recordsChan <- record
}
close(recordsChan)
}()
// Wait for workers to finish
go func() {
wg.Wait()
close(resultsChan)
}()
// Collect results
for result := range resultsChan {
if result.error != "" {
errors = append(errors, result.error)
} else if result.item != nil {
items = append(items, *result.item)
}
}
s.logger.Info("TRANSFORM completed",
zap.Int("success", len(items)),
zap.Int("failed", len(errors)))
return items, errors
}
func (s *ETLService) transformSingleItem(record map[string]string) (*models.Item, error) {
// Validate required fields
required := []string{"name", "category_id", "location", "description", "date_found", "reporter_name", "reporter_contact"}
for _, field := range required {
if record[field] == "" {
return nil, fmt.Errorf("missing required field: %s", field)
}
}
// Parse category_id
categoryID, err := strconv.ParseUint(record["category_id"], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid category_id: %s", record["category_id"])
}
// Parse date_found
dateFound, err := time.Parse("2006-01-02", record["date_found"])
if err != nil {
return nil, fmt.Errorf("invalid date_found format: %s", record["date_found"])
}
// Parse reporter_id
reporterID, err := strconv.ParseUint(record["reporter_id"], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid reporter_id: %s", record["reporter_id"])
}
// Create item
item := &models.Item{
Name: record["name"],
CategoryID: uint(categoryID),
PhotoURL: record["photo_url"],
Location: record["location"],
Description: record["description"],
DateFound: dateFound,
Status: models.ItemStatusUnclaimed,
ReporterID: uint(reporterID),
ReporterName: record["reporter_name"],
ReporterContact: record["reporter_contact"],
}
return item, nil
}
// ✅ LOAD - Load transformed data to database with TRANSACTION
func (s *ETLService) LoadItems(items []models.Item) (*ETLResult, error) {
s.logger.Info("Starting LOAD phase", zap.Int("items", len(items)))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
startTime := time.Now()
result := &ETLResult{
TotalRecords: len(items),
Errors: []string{},
}
// Batch insert with transaction
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, item := range items {
if err := tx.Create(&item).Error; err != nil {
result.FailedRecords++
result.Errors = append(result.Errors, fmt.Sprintf("Failed to insert %s: %v", item.Name, err))
s.logger.Warn("Failed to insert item",
zap.String("name", item.Name),
zap.Error(err))
} else {
result.SuccessRecords++
}
}
// If more than 50% failed, rollback
if result.FailedRecords > result.TotalRecords/2 {
return fmt.Errorf("too many failures (%d/%d), rolling back transaction",
result.FailedRecords, result.TotalRecords)
}
return nil
})
result.Duration = time.Since(startTime)
if err != nil {
s.logger.Error("LOAD failed", zap.Error(err))
return result, err
}
s.logger.Info("LOAD completed",
zap.Int("success", result.SuccessRecords),
zap.Int("failed", result.FailedRecords),
zap.Duration("duration", result.Duration))
return result, nil
}
// ✅ Full ETL Pipeline
func (s *ETLService) RunETLPipeline(csvPath string) (*ETLResult, error) {
s.logger.Info("Starting FULL ETL Pipeline", zap.String("source", csvPath))
// EXTRACT
records, err := s.ExtractFromCSV(csvPath)
if err != nil {
return nil, fmt.Errorf("extract failed: %w", err)
}
// TRANSFORM
items, transformErrors := s.TransformItemData(records)
// LOAD
result, err := s.LoadItems(items)
if err != nil {
return nil, fmt.Errorf("load failed: %w", err)
}
// Add transform errors to result
result.Errors = append(result.Errors, transformErrors...)
s.logger.Info("ETL Pipeline completed",
zap.Int("total", result.TotalRecords),
zap.Int("success", result.SuccessRecords),
zap.Int("failed", result.FailedRecords))
return result, nil
}
// ✅ Export data (Reverse ETL)
func (s *ETLService) ExportToCSV(filepath string, query string) error {
s.logger.Info("Exporting data to CSV", zap.String("file", filepath))
var items []models.Item
if err := s.db.Raw(query).Scan(&items).Error; err != nil {
return fmt.Errorf("failed to query data: %w", err)
}
file, err := os.Create(filepath)
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
// Write headers
headers := []string{"id", "name", "category_id", "location", "description", "date_found", "status"}
if err := writer.Write(headers); err != nil {
return err
}
// Write data
for _, item := range items {
record := []string{
strconv.Itoa(int(item.ID)),
item.Name,
strconv.Itoa(int(item.CategoryID)),
item.Location,
item.Description,
item.DateFound.Format("2006-01-02"),
item.Status,
}
if err := writer.Write(record); err != nil {
return err
}
}
s.logger.Info("Export completed", zap.Int("records", len(items)))
return nil
}
// ✅ Data Synchronization between databases
func (s *ETLService) SyncToExternalDB(externalDB *gorm.DB) error {
s.logger.Info("Starting database synchronization")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
// Get all items from source
var items []models.Item
if err := s.db.WithContext(ctx).Find(&items).Error; err != nil {
return fmt.Errorf("failed to fetch items: %w", err)
}
// Sync to external DB with transaction
return externalDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, item := range items {
// Upsert logic
if err := tx.Save(&item).Error; err != nil {
s.logger.Warn("Failed to sync item",
zap.Uint("id", item.ID),
zap.Error(err))
return err
}
}
return nil
})
}