package main

import (
	"context"
	"encoding/json"
	"log"
	"time"

	"github.com/redis/go-redis/v9"
)

const (
	redisAddr = "13.237.154.232:6379"
	redisUser = "ic3-integrator"
	redisPwd  = "03589eaa050e329e1bf4306efb7aa9e5eb15989ef6e0e0bbe8f33a9ab0da4d66"
)

// TelemetryEnvelope matches the IC³ Redis data format (ic3:latest:D##-S###)
// and maps to the full v3 PostgreSQL schema.
type TelemetryEnvelope struct {
	// Core fields present in every Redis record
	SystemID   string         `json:"system_id"`
	Domain     string         `json:"domain"`       // → domain_code in DB
	Protocol   string         `json:"protocol"`
	RecordID   string         `json:"record_id"`
	OccurredAt string         `json:"occurred_at"`  // → timestamp_utc in DB
	Values     map[string]any `json:"values"`        // → payload JSONB in DB
	Quality    string         `json:"quality"`       // → quality_code in DB
	Alarm      string         `json:"alarm"`         // → alarm_state in DB

	// Optional location / metadata fields (present in enriched payloads)
	SourceSystem    string  `json:"source_system,omitempty"`
	CollectorID     string  `json:"collector_id,omitempty"`
	SequenceNo      int     `json:"sequence_no,omitempty"`
	LocationPath    string  `json:"location_path,omitempty"`
	CountryCode     string  `json:"country_code,omitempty"`
	StateCode       string  `json:"state_code,omitempty"`
	DistrictCode    string  `json:"district_code,omitempty"`
	TalukCode       string  `json:"taluk_code,omitempty"`
	CityCode        string  `json:"city_code,omitempty"`
	WardCode        string  `json:"ward_code,omitempty"`
	ZoneID          string  `json:"zone_id,omitempty"`
	DMAID           string  `json:"dma_id,omitempty"`
	PressureZoneID  string  `json:"pressure_zone_id,omitempty"`
	SiteID          string  `json:"site_id,omitempty"`
	Latitude        float64 `json:"latitude,omitempty"`
	Longitude       float64 `json:"longitude,omitempty"`
	ElevationM      float64 `json:"elevation_m,omitempty"`
	StreamTopic     string  `json:"stream_topic,omitempty"`
	Priority        string  `json:"priority,omitempty"`

	// Derived from Redis key (ic3:latest:<AssetID>)
	AssetID string `json:"asset_id,omitempty"`
}

func newRedisClient() *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:         redisAddr,
		Username:     redisUser,
		Password:     redisPwd,
		DialTimeout:  5 * time.Second,
		ReadTimeout:  3 * time.Second,
		WriteTimeout: 3 * time.Second,
	})
}

func pollRedis(ctx context.Context, rdb *redis.Client, db *DB, hub *Hub) {
	if err := rdb.Ping(ctx).Err(); err != nil {
		log.Printf("Redis connect error: %v — retrying every 10s", err)
		t := time.NewTicker(10 * time.Second)
		defer t.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case <-t.C:
				if err := rdb.Ping(ctx).Err(); err == nil {
					log.Println("Redis reconnected")
					goto poll
				}
			}
		}
	}
	log.Printf("Redis connected to %s", redisAddr)

poll:
	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			scanAndBroadcast(ctx, rdb, db, hub)
		}
	}
}

func scanAndBroadcast(ctx context.Context, rdb *redis.Client, db *DB, hub *Hub) {
	var cursor uint64
	var keys []string
	for {
		k, next, err := rdb.Scan(ctx, cursor, "ic3:latest:*", 200).Result()
		if err != nil {
			log.Printf("Redis scan: %v", err)
			return
		}
		keys = append(keys, k...)
		cursor = next
		if cursor == 0 {
			break
		}
	}
	if len(keys) == 0 {
		return
	}

	vals, err := rdb.MGet(ctx, keys...).Result()
	if err != nil {
		log.Printf("Redis mget: %v", err)
		return
	}

	var batch []TelemetryEnvelope
	for i, v := range vals {
		if v == nil {
			continue
		}
		s, ok := v.(string)
		if !ok {
			continue
		}
		var env TelemetryEnvelope
		if err := json.Unmarshal([]byte(s), &env); err != nil {
			continue
		}
		// Use key suffix as AssetID
		env.AssetID = keys[i][len("ic3:latest:"):]
		if env.OccurredAt == "" {
			env.OccurredAt = time.Now().UTC().Format(time.RFC3339)
		}
		batch = append(batch, env)

		msg, _ := json.Marshal(map[string]any{"type": "telemetry", "data": env})
		select {
		case hub.broadcast <- msg:
		default:
		}
	}

	if db != nil && len(batch) > 0 {
		db.saveBatch(ctx, batch)
	}
}
