Add telemetry data service and dashboard revamp
Introduce a telemetry data microservice under misc/data: add Dockerfile, entrypoint, migration tools, README, LICENSE and a .gitignore. Increase Docker CACHE_TTL_SECONDS to 300s. Implement extensive dashboard and analytics updates in dashboard.go: add total_all_time and sample_size, return total item counts from fetchRecords (with page/limit handling and a maxRecords guard), raise top-N limits, add a minimum-installs threshold for failed-apps, and numerous UI/style/layout improvements in the embedded DashboardHTML. Minor formatting tweak to misc/api.func.
This commit is contained in:
@@ -106,7 +106,7 @@ type TelemetryIn struct {
|
||||
RepoSource string `json:"repo_source,omitempty"` // "ProxmoxVE", "ProxmoxVED", or "external"
|
||||
}
|
||||
|
||||
// TelemetryOut is sent to PocketBase (matches _telemetry_data collection)
|
||||
// TelemetryOut is sent to PocketBase (matches telemetry collection)
|
||||
type TelemetryOut struct {
|
||||
RandomID string `json:"random_id"`
|
||||
Type string `json:"type"`
|
||||
@@ -309,7 +309,7 @@ func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, u
|
||||
}
|
||||
|
||||
// FetchRecordsPaginated retrieves records with pagination and optional filters.
|
||||
func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType, sortField, repoSource string) ([]TelemetryRecord, int, error) {
|
||||
func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType, typeFilter, sortField, repoSource string) ([]TelemetryRecord, int, error) {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
@@ -325,6 +325,9 @@ func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, s
|
||||
if osType != "" {
|
||||
filters = append(filters, fmt.Sprintf("os_type='%s'", osType))
|
||||
}
|
||||
if typeFilter != "" {
|
||||
filters = append(filters, fmt.Sprintf("type='%s'", typeFilter))
|
||||
}
|
||||
if repoSource != "" {
|
||||
filters = append(filters, fmt.Sprintf("repo_source='%s'", repoSource))
|
||||
}
|
||||
@@ -759,7 +762,7 @@ func main() {
|
||||
// Cache config
|
||||
RedisURL: env("REDIS_URL", ""),
|
||||
EnableRedis: envBool("ENABLE_REDIS", false),
|
||||
CacheTTL: time.Duration(envInt("CACHE_TTL_SECONDS", 60)) * time.Second,
|
||||
CacheTTL: time.Duration(envInt("CACHE_TTL_SECONDS", 300)) * time.Second,
|
||||
CacheEnabled: envBool("ENABLE_CACHE", true),
|
||||
|
||||
// Alert config
|
||||
@@ -883,14 +886,12 @@ func main() {
|
||||
|
||||
// Dashboard API endpoint (with caching)
|
||||
mux.HandleFunc("/api/dashboard", func(w http.ResponseWriter, r *http.Request) {
|
||||
days := 30
|
||||
days := 7 // Default: 7 days
|
||||
if d := r.URL.Query().Get("days"); d != "" {
|
||||
fmt.Sscanf(d, "%d", &days)
|
||||
if days < 1 {
|
||||
days = 1
|
||||
}
|
||||
if days > 365 {
|
||||
days = 365
|
||||
// days=0 means "all entries", negative values are invalid
|
||||
if days < 0 {
|
||||
days = 7
|
||||
}
|
||||
}
|
||||
|
||||
@@ -904,7 +905,8 @@ func main() {
|
||||
repoSource = ""
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
// Increase timeout for large datasets (dashboard aggregation takes time)
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Try cache first
|
||||
@@ -941,6 +943,7 @@ func main() {
|
||||
status := r.URL.Query().Get("status")
|
||||
app := r.URL.Query().Get("app")
|
||||
osType := r.URL.Query().Get("os")
|
||||
typeFilter := r.URL.Query().Get("type")
|
||||
sort := r.URL.Query().Get("sort")
|
||||
repoSource := r.URL.Query().Get("repo")
|
||||
if repoSource == "" {
|
||||
@@ -966,7 +969,7 @@ func main() {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType, sort, repoSource)
|
||||
records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType, typeFilter, sort, repoSource)
|
||||
if err != nil {
|
||||
log.Printf("records fetch failed: %v", err)
|
||||
http.Error(w, "failed to fetch records", http.StatusInternalServerError)
|
||||
@@ -1114,6 +1117,22 @@ func main() {
|
||||
ReadHeaderTimeout: 3 * time.Second,
|
||||
}
|
||||
|
||||
// Background cache warmup job - pre-populates cache for common dashboard queries
|
||||
if cfg.CacheEnabled {
|
||||
go func() {
|
||||
// Initial warmup after startup
|
||||
time.Sleep(10 * time.Second)
|
||||
warmupDashboardCache(pb, cache, cfg)
|
||||
|
||||
// Periodic refresh (every 4 minutes, before 5-minute TTL expires)
|
||||
ticker := time.NewTicker(4 * time.Minute)
|
||||
for range ticker.C {
|
||||
warmupDashboardCache(pb, cache, cfg)
|
||||
}
|
||||
}()
|
||||
log.Println("background cache warmup enabled")
|
||||
}
|
||||
|
||||
log.Printf("telemetry-ingest listening on %s", cfg.ListenAddr)
|
||||
log.Fatal(srv.ListenAndServe())
|
||||
}
|
||||
@@ -1199,4 +1218,44 @@ func splitCSV(s string) []string {
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// warmupDashboardCache pre-populates the cache with common dashboard queries
|
||||
func warmupDashboardCache(pb *PBClient, cache *Cache, cfg Config) {
|
||||
log.Println("[CACHE] Starting dashboard cache warmup...")
|
||||
|
||||
// Common day ranges and repos to pre-cache
|
||||
dayRanges := []int{7, 30, 90}
|
||||
repos := []string{"ProxmoxVE", ""} // ProxmoxVE and "all"
|
||||
|
||||
warmed := 0
|
||||
for _, days := range dayRanges {
|
||||
for _, repo := range repos {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||
|
||||
cacheKey := fmt.Sprintf("dashboard:%d:%s", days, repo)
|
||||
|
||||
// Check if already cached
|
||||
var existing *DashboardData
|
||||
if cache.Get(ctx, cacheKey, &existing) {
|
||||
cancel()
|
||||
continue // Already cached, skip
|
||||
}
|
||||
|
||||
// Fetch and cache
|
||||
data, err := pb.FetchDashboardData(ctx, days, repo)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[CACHE] Warmup failed for days=%d repo=%s: %v", days, repo, err)
|
||||
continue
|
||||
}
|
||||
|
||||
_ = cache.Set(context.Background(), cacheKey, data, cfg.CacheTTL)
|
||||
warmed++
|
||||
log.Printf("[CACHE] Warmed cache for days=%d repo=%s (%d installs)", days, repo, data.TotalAllTime)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[CACHE] Dashboard cache warmup complete (%d entries)", warmed)
|
||||
}
|
||||
Reference in New Issue
Block a user