352 lines
9.1 KiB
Go
352 lines
9.1 KiB
Go
// internal/services/etl_service.go
|
|
package services
|
|
|
|
import (
|
|
"context"
|
|
"encoding/csv"
|
|
"fmt"
|
|
"io"
|
|
"lost-and-found/internal/models"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// ✅ KRITERIA BASDAT: ETL Implementation (15%)
|
|
type ETLService struct {
|
|
db *gorm.DB
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewETLService(db *gorm.DB, logger *zap.Logger) *ETLService {
|
|
return &ETLService{
|
|
db: db,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// ETLResult represents ETL operation result
|
|
type ETLResult struct {
|
|
TotalRecords int `json:"total_records"`
|
|
SuccessRecords int `json:"success_records"`
|
|
FailedRecords int `json:"failed_records"`
|
|
Duration time.Duration `json:"duration"`
|
|
Errors []string `json:"errors"`
|
|
TransformedData map[string]interface{} `json:"transformed_data"`
|
|
}
|
|
|
|
// ✅ EXTRACT - Extract data from CSV
|
|
func (s *ETLService) ExtractFromCSV(filepath string) ([]map[string]string, error) {
|
|
s.logger.Info("Starting EXTRACT phase", zap.String("file", filepath))
|
|
|
|
file, err := os.Open(filepath)
|
|
if err != nil {
|
|
s.logger.Error("Failed to open CSV file", zap.Error(err))
|
|
return nil, fmt.Errorf("failed to open file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
reader := csv.NewReader(file)
|
|
headers, err := reader.Read()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read headers: %w", err)
|
|
}
|
|
|
|
var records []map[string]string
|
|
lineNumber := 1
|
|
|
|
for {
|
|
record, err := reader.Read()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
s.logger.Warn("Skipping invalid line", zap.Int("line", lineNumber), zap.Error(err))
|
|
lineNumber++
|
|
continue
|
|
}
|
|
|
|
data := make(map[string]string)
|
|
for i, value := range record {
|
|
if i < len(headers) {
|
|
data[headers[i]] = strings.TrimSpace(value)
|
|
}
|
|
}
|
|
records = append(records, data)
|
|
lineNumber++
|
|
}
|
|
|
|
s.logger.Info("EXTRACT completed", zap.Int("records", len(records)))
|
|
return records, nil
|
|
}
|
|
|
|
// ✅ TRANSFORM - Transform and validate data
|
|
func (s *ETLService) TransformItemData(records []map[string]string) ([]models.Item, []string) {
|
|
s.logger.Info("Starting TRANSFORM phase", zap.Int("records", len(records)))
|
|
|
|
var items []models.Item
|
|
var errors []string
|
|
|
|
// Worker Pool Pattern for concurrent transformation
|
|
const numWorkers = 5
|
|
recordsChan := make(chan map[string]string, len(records))
|
|
resultsChan := make(chan struct {
|
|
item *models.Item
|
|
error string
|
|
}, len(records))
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
// Start workers
|
|
for i := 0; i < numWorkers; i++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
for record := range recordsChan {
|
|
item, err := s.transformSingleItem(record)
|
|
if err != nil {
|
|
resultsChan <- struct {
|
|
item *models.Item
|
|
error string
|
|
}{nil, err.Error()}
|
|
} else {
|
|
resultsChan <- struct {
|
|
item *models.Item
|
|
error string
|
|
}{item, ""}
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Send records to workers
|
|
go func() {
|
|
for _, record := range records {
|
|
recordsChan <- record
|
|
}
|
|
close(recordsChan)
|
|
}()
|
|
|
|
// Wait for workers to finish
|
|
go func() {
|
|
wg.Wait()
|
|
close(resultsChan)
|
|
}()
|
|
|
|
// Collect results
|
|
for result := range resultsChan {
|
|
if result.error != "" {
|
|
errors = append(errors, result.error)
|
|
} else if result.item != nil {
|
|
items = append(items, *result.item)
|
|
}
|
|
}
|
|
|
|
s.logger.Info("TRANSFORM completed",
|
|
zap.Int("success", len(items)),
|
|
zap.Int("failed", len(errors)))
|
|
|
|
return items, errors
|
|
}
|
|
|
|
func (s *ETLService) transformSingleItem(record map[string]string) (*models.Item, error) {
|
|
// Validate required fields
|
|
required := []string{"name", "category_id", "location", "description", "date_found", "reporter_name", "reporter_contact"}
|
|
for _, field := range required {
|
|
if record[field] == "" {
|
|
return nil, fmt.Errorf("missing required field: %s", field)
|
|
}
|
|
}
|
|
|
|
// Parse category_id
|
|
categoryID, err := strconv.ParseUint(record["category_id"], 10, 32)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid category_id: %s", record["category_id"])
|
|
}
|
|
|
|
// Parse date_found
|
|
dateFound, err := time.Parse("2006-01-02", record["date_found"])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid date_found format: %s", record["date_found"])
|
|
}
|
|
|
|
// Parse reporter_id
|
|
reporterID, err := strconv.ParseUint(record["reporter_id"], 10, 32)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid reporter_id: %s", record["reporter_id"])
|
|
}
|
|
|
|
// Create item
|
|
item := &models.Item{
|
|
Name: record["name"],
|
|
CategoryID: uint(categoryID),
|
|
PhotoURL: record["photo_url"],
|
|
Location: record["location"],
|
|
Description: record["description"],
|
|
DateFound: dateFound,
|
|
Status: models.ItemStatusUnclaimed,
|
|
ReporterID: uint(reporterID),
|
|
ReporterName: record["reporter_name"],
|
|
ReporterContact: record["reporter_contact"],
|
|
}
|
|
|
|
return item, nil
|
|
}
|
|
|
|
// ✅ LOAD - Load transformed data to database with TRANSACTION
|
|
func (s *ETLService) LoadItems(items []models.Item) (*ETLResult, error) {
|
|
s.logger.Info("Starting LOAD phase", zap.Int("items", len(items)))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
defer cancel()
|
|
|
|
startTime := time.Now()
|
|
result := &ETLResult{
|
|
TotalRecords: len(items),
|
|
Errors: []string{},
|
|
}
|
|
|
|
// Batch insert with transaction
|
|
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
for _, item := range items {
|
|
if err := tx.Create(&item).Error; err != nil {
|
|
result.FailedRecords++
|
|
result.Errors = append(result.Errors, fmt.Sprintf("Failed to insert %s: %v", item.Name, err))
|
|
s.logger.Warn("Failed to insert item",
|
|
zap.String("name", item.Name),
|
|
zap.Error(err))
|
|
} else {
|
|
result.SuccessRecords++
|
|
}
|
|
}
|
|
|
|
// If more than 50% failed, rollback
|
|
if result.FailedRecords > result.TotalRecords/2 {
|
|
return fmt.Errorf("too many failures (%d/%d), rolling back transaction",
|
|
result.FailedRecords, result.TotalRecords)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
result.Duration = time.Since(startTime)
|
|
|
|
if err != nil {
|
|
s.logger.Error("LOAD failed", zap.Error(err))
|
|
return result, err
|
|
}
|
|
|
|
s.logger.Info("LOAD completed",
|
|
zap.Int("success", result.SuccessRecords),
|
|
zap.Int("failed", result.FailedRecords),
|
|
zap.Duration("duration", result.Duration))
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ✅ Full ETL Pipeline
|
|
func (s *ETLService) RunETLPipeline(csvPath string) (*ETLResult, error) {
|
|
s.logger.Info("Starting FULL ETL Pipeline", zap.String("source", csvPath))
|
|
|
|
// EXTRACT
|
|
records, err := s.ExtractFromCSV(csvPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("extract failed: %w", err)
|
|
}
|
|
|
|
// TRANSFORM
|
|
items, transformErrors := s.TransformItemData(records)
|
|
|
|
// LOAD
|
|
result, err := s.LoadItems(items)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("load failed: %w", err)
|
|
}
|
|
|
|
// Add transform errors to result
|
|
result.Errors = append(result.Errors, transformErrors...)
|
|
|
|
s.logger.Info("ETL Pipeline completed",
|
|
zap.Int("total", result.TotalRecords),
|
|
zap.Int("success", result.SuccessRecords),
|
|
zap.Int("failed", result.FailedRecords))
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ✅ Export data (Reverse ETL)
|
|
func (s *ETLService) ExportToCSV(filepath string, query string) error {
|
|
s.logger.Info("Exporting data to CSV", zap.String("file", filepath))
|
|
|
|
var items []models.Item
|
|
if err := s.db.Raw(query).Scan(&items).Error; err != nil {
|
|
return fmt.Errorf("failed to query data: %w", err)
|
|
}
|
|
|
|
file, err := os.Create(filepath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
writer := csv.NewWriter(file)
|
|
defer writer.Flush()
|
|
|
|
// Write headers
|
|
headers := []string{"id", "name", "category_id", "location", "description", "date_found", "status"}
|
|
if err := writer.Write(headers); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Write data
|
|
for _, item := range items {
|
|
record := []string{
|
|
strconv.Itoa(int(item.ID)),
|
|
item.Name,
|
|
strconv.Itoa(int(item.CategoryID)),
|
|
item.Location,
|
|
item.Description,
|
|
item.DateFound.Format("2006-01-02"),
|
|
item.Status,
|
|
}
|
|
if err := writer.Write(record); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.logger.Info("Export completed", zap.Int("records", len(items)))
|
|
return nil
|
|
}
|
|
|
|
// ✅ Data Synchronization between databases
|
|
func (s *ETLService) SyncToExternalDB(externalDB *gorm.DB) error {
|
|
s.logger.Info("Starting database synchronization")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
|
defer cancel()
|
|
|
|
// Get all items from source
|
|
var items []models.Item
|
|
if err := s.db.WithContext(ctx).Find(&items).Error; err != nil {
|
|
return fmt.Errorf("failed to fetch items: %w", err)
|
|
}
|
|
|
|
// Sync to external DB with transaction
|
|
return externalDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
for _, item := range items {
|
|
// Upsert logic
|
|
if err := tx.Save(&item).Error; err != nil {
|
|
s.logger.Warn("Failed to sync item",
|
|
zap.Uint("id", item.ID),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
} |