Add weekly reports, cleanup, and dashboard UI
Introduce weekly summary reports and a cleanup job, enhance dashboard UI, and adjust telemetry/build settings. - Add REPO_SOURCE to misc/api.func and include repo_source in telemetry payloads. - Implement weekly report generation/scheduling in alerts.go: new data types, HTML/plain templates, scheduler, SendWeeklyReport/TestWeeklyReport, and email/HTML helpers. - Add Cleaner (misc/data/cleanup.go) to detect and mark stuck installations as 'unknown' with scheduling and manual trigger APIs. - Enhance dashboard backend/frontend (misc/data/dashboard.go): optional days filter (allow 'All'), increase fetch page size, simplify fetchRecords, add quick filter buttons, detail & health modals, improved styles and chart options, and client-side record detail view. - Update Dockerfile (misc/data/Dockerfile): rename binaries to telemetry-service and build migrate from ./migration/migrate.go; copy adjusted in final image. - Add migration tooling (misc/data/migration/migrate.sh and migration.go) and other small service changes. These changes add operational reporting and cleanup capabilities, improve observability and UX of the dashboard, and align build and telemetry identifiers for the service.
This commit is contained in:
@@ -27,7 +27,8 @@ type Config struct {
|
||||
PBAuthCollection string // "_dev_telemetry_service"
|
||||
PBIdentity string // email
|
||||
PBPassword string
|
||||
PBTargetColl string // "_dev_telemetry_data"
|
||||
PBTargetColl string // "_dev_telemetry_data" (dev default)
|
||||
PBLiveTargetColl string // "_live_telemetry_data" (production)
|
||||
|
||||
// Limits
|
||||
MaxBodyBytes int64
|
||||
@@ -101,6 +102,9 @@ type TelemetryIn struct {
|
||||
|
||||
// Error categorization
|
||||
ErrorCategory string `json:"error_category,omitempty"` // "network", "storage", "dependency", "permission", "timeout", "unknown"
|
||||
|
||||
// Repository source for collection routing
|
||||
RepoSource string `json:"repo_source,omitempty"` // "community-scripts/ProxmoxVE" or "community-scripts/ProxmoxVED"
|
||||
}
|
||||
|
||||
// TelemetryOut is sent to PocketBase (matches _dev_telemetry_data collection)
|
||||
@@ -146,12 +150,19 @@ type TelemetryStatusUpdate struct {
|
||||
RAMSpeed string `json:"ram_speed,omitempty"`
|
||||
}
|
||||
|
||||
// Allowed values for 'repo_source' field — controls collection routing
|
||||
var allowedRepoSource = map[string]bool{
|
||||
"community-scripts/ProxmoxVE": true,
|
||||
"community-scripts/ProxmoxVED": true,
|
||||
}
|
||||
|
||||
type PBClient struct {
|
||||
baseURL string
|
||||
authCollection string
|
||||
identity string
|
||||
password string
|
||||
targetColl string
|
||||
devColl string // "_dev_telemetry_data"
|
||||
liveColl string // "_live_telemetry_data"
|
||||
|
||||
mu sync.Mutex
|
||||
token string
|
||||
@@ -165,13 +176,25 @@ func NewPBClient(cfg Config) *PBClient {
|
||||
authCollection: cfg.PBAuthCollection,
|
||||
identity: cfg.PBIdentity,
|
||||
password: cfg.PBPassword,
|
||||
targetColl: cfg.PBTargetColl,
|
||||
devColl: cfg.PBTargetColl,
|
||||
liveColl: cfg.PBLiveTargetColl,
|
||||
http: &http.Client{
|
||||
Timeout: cfg.RequestTimeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// resolveCollection maps a repo_source value to the correct PocketBase collection.
|
||||
// - "community-scripts/ProxmoxVE" → live collection
|
||||
// - "community-scripts/ProxmoxVED" → dev collection
|
||||
// - empty / unknown → dev collection (safe default)
|
||||
func (p *PBClient) resolveCollection(repoSource string) string {
|
||||
if repoSource == "community-scripts/ProxmoxVE" && p.liveColl != "" {
|
||||
return p.liveColl
|
||||
}
|
||||
return p.devColl
|
||||
}
|
||||
|
||||
func (p *PBClient) ensureAuth(ctx context.Context) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
@@ -223,8 +246,8 @@ func (p *PBClient) ensureAuth(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FindRecordByRandomID searches for an existing record by random_id
|
||||
func (p *PBClient) FindRecordByRandomID(ctx context.Context, randomID string) (string, error) {
|
||||
// FindRecordByRandomID searches for an existing record by random_id in the given collection
|
||||
func (p *PBClient) FindRecordByRandomID(ctx context.Context, coll, randomID string) (string, error) {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -233,7 +256,7 @@ func (p *PBClient) FindRecordByRandomID(ctx context.Context, randomID string) (s
|
||||
filter := fmt.Sprintf("random_id='%s'", randomID)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||
fmt.Sprintf("%s/api/collections/%s/records?filter=%s&fields=id&perPage=1",
|
||||
p.baseURL, p.targetColl, filter),
|
||||
p.baseURL, coll, filter),
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -267,14 +290,14 @@ func (p *PBClient) FindRecordByRandomID(ctx context.Context, randomID string) (s
|
||||
}
|
||||
|
||||
// UpdateTelemetryStatus updates only status, error, and exit_code of an existing record
|
||||
func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, update TelemetryStatusUpdate) error {
|
||||
func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, coll, recordID string, update TelemetryStatusUpdate) error {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, _ := json.Marshal(update)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPatch,
|
||||
fmt.Sprintf("%s/api/collections/%s/records/%s", p.baseURL, p.targetColl, recordID),
|
||||
fmt.Sprintf("%s/api/collections/%s/records/%s", p.baseURL, coll, recordID),
|
||||
bytes.NewReader(b),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -295,7 +318,8 @@ func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, u
|
||||
return nil
|
||||
}
|
||||
|
||||
// FetchRecordsPaginated retrieves records with pagination and optional filters
|
||||
// FetchRecordsPaginated retrieves records with pagination and optional filters.
|
||||
// Uses devColl by default (dashboard shows dev data); for live data, use separate endpoint if needed.
|
||||
func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType, sortField string) ([]TelemetryRecord, int, error) {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
return nil, 0, err
|
||||
@@ -337,7 +361,7 @@ func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, s
|
||||
}
|
||||
|
||||
reqURL := fmt.Sprintf("%s/api/collections/%s/records?sort=%s&page=%d&perPage=%d%s",
|
||||
p.baseURL, p.targetColl, sort, page, limit, filterStr)
|
||||
p.baseURL, p.devColl, sort, page, limit, filterStr)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
|
||||
if err != nil {
|
||||
@@ -366,17 +390,23 @@ func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, s
|
||||
return result.Items, result.TotalItems, nil
|
||||
}
|
||||
|
||||
// UpsertTelemetry handles both creation and updates intelligently
|
||||
// - status="installing": Always creates a new record
|
||||
// - status!="installing": Updates existing record (found by random_id) with status/error/exit_code only
|
||||
func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) error {
|
||||
// UpsertTelemetry handles both creation and updates intelligently.
|
||||
// Routes to the correct PocketBase collection based on repoSource:
|
||||
// - "community-scripts/ProxmoxVE" → _live_telemetry_data
|
||||
// - "community-scripts/ProxmoxVED" → _dev_telemetry_data
|
||||
//
|
||||
// For status="installing": always creates a new record.
|
||||
// For status!="installing": updates existing record (found by random_id).
|
||||
func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut, repoSource string) error {
|
||||
coll := p.resolveCollection(repoSource)
|
||||
|
||||
// For "installing" status, always create new record
|
||||
if payload.Status == "installing" {
|
||||
return p.CreateTelemetry(ctx, payload)
|
||||
return p.CreateTelemetry(ctx, coll, payload)
|
||||
}
|
||||
|
||||
// For status updates (success/failed/unknown), find and update existing record
|
||||
recordID, err := p.FindRecordByRandomID(ctx, payload.RandomID)
|
||||
recordID, err := p.FindRecordByRandomID(ctx, coll, payload.RandomID)
|
||||
if err != nil {
|
||||
// Search failed, log and return error
|
||||
return fmt.Errorf("cannot find record to update: %w", err)
|
||||
@@ -385,7 +415,7 @@ func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) er
|
||||
if recordID == "" {
|
||||
// Record not found - this shouldn't happen normally
|
||||
// Create a full record as fallback
|
||||
return p.CreateTelemetry(ctx, payload)
|
||||
return p.CreateTelemetry(ctx, coll, payload)
|
||||
}
|
||||
|
||||
// Update only status, error, exit_code, and new metrics fields
|
||||
@@ -402,17 +432,17 @@ func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) er
|
||||
CPUModel: payload.CPUModel,
|
||||
RAMSpeed: payload.RAMSpeed,
|
||||
}
|
||||
return p.UpdateTelemetryStatus(ctx, recordID, update)
|
||||
return p.UpdateTelemetryStatus(ctx, coll, recordID, update)
|
||||
}
|
||||
|
||||
func (p *PBClient) CreateTelemetry(ctx context.Context, payload TelemetryOut) error {
|
||||
func (p *PBClient) CreateTelemetry(ctx context.Context, coll string, payload TelemetryOut) error {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, _ := json.Marshal(payload)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
||||
fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, p.targetColl),
|
||||
fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, coll),
|
||||
bytes.NewReader(b),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -617,6 +647,9 @@ func validate(in *TelemetryIn) error {
|
||||
in.RAMSpeed = sanitizeShort(in.RAMSpeed, 16)
|
||||
in.ErrorCategory = strings.ToLower(sanitizeShort(in.ErrorCategory, 32))
|
||||
|
||||
// Sanitize repo_source (routing field)
|
||||
in.RepoSource = sanitizeShort(in.RepoSource, 64)
|
||||
|
||||
// Default empty values to "unknown" for consistency
|
||||
if in.GPUVendor == "" {
|
||||
in.GPUVendor = "unknown"
|
||||
@@ -697,6 +730,11 @@ func validate(in *TelemetryIn) error {
|
||||
return errors.New("invalid install_duration (max 24h)")
|
||||
}
|
||||
|
||||
// Validate repo_source: must be an allowed repository or empty
|
||||
if in.RepoSource != "" && !allowedRepoSource[in.RepoSource] {
|
||||
return errors.New("invalid repo_source (must be 'community-scripts/ProxmoxVE' or 'community-scripts/ProxmoxVED')")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -721,6 +759,7 @@ func main() {
|
||||
PBIdentity: mustEnv("PB_IDENTITY"),
|
||||
PBPassword: mustEnv("PB_PASSWORD"),
|
||||
PBTargetColl: env("PB_TARGET_COLLECTION", "_dev_telemetry_data"),
|
||||
PBLiveTargetColl: env("PB_LIVE_TARGET_COLLECTION", "_live_telemetry_data"),
|
||||
|
||||
MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024),
|
||||
RateLimitRPM: envInt("RATE_LIMIT_RPM", 60),
|
||||
@@ -1048,7 +1087,8 @@ func main() {
|
||||
defer cancel()
|
||||
|
||||
// Upsert: Creates new record if random_id doesn't exist, updates if it does
|
||||
if err := pb.UpsertTelemetry(ctx, out); err != nil {
|
||||
// Routes to correct collection based on repo_source
|
||||
if err := pb.UpsertTelemetry(ctx, out, in.RepoSource); err != nil {
|
||||
// GDPR: don't log raw payload, don't log IPs; log only generic error
|
||||
log.Printf("pocketbase write failed: %v", err)
|
||||
http.Error(w, "upstream error", http.StatusBadGateway)
|
||||
|
||||
Reference in New Issue
Block a user