Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cmd/msgvault/cmd/build_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,30 @@ func exportToCSV(db *sql.DB, query string, dest string) error {
return rows.Err()
}

// rebuildCacheAfterWrite rebuilds the analytics cache after a write
// operation. Uses the staleness check to determine whether a full
// rebuild (deletions/mutations) or incremental export (new messages
// only) is needed. Logs a warning on failure — the data is safe in
// SQLite.
func rebuildCacheAfterWrite(dbPath string) {
analyticsDir := cfg.AnalyticsDir()
fullRebuild := false
if staleness := cacheNeedsBuild(dbPath, analyticsDir); staleness.FullRebuild {
fullRebuild = true
}
result, err := buildCache(dbPath, analyticsDir, fullRebuild)
if err != nil {
fmt.Fprintf(os.Stderr,
"Warning: cache rebuild failed: %v\n", err)
fmt.Fprintf(os.Stderr,
"Run 'msgvault build-cache' to retry.\n")
return
}
if !result.Skipped {
logger.Info("cache rebuilt", "exported", result.ExportedCount)
}
}

func init() {
rootCmd.AddCommand(buildCacheCmd)
rootCmd.AddCommand(cacheStatsCmd)
Expand Down
13 changes: 2 additions & 11 deletions cmd/msgvault/cmd/deletions.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,17 +495,8 @@ Examples:

fmt.Println("\nDeletion complete!")

// Refresh analytics cache to reflect deleted messages
analyticsDir := cfg.AnalyticsDir()
if _, err := os.Stat(analyticsDir); err == nil {
fmt.Println("\nRefreshing analytics cache...")
if result, err := buildCache(dbPath, analyticsDir, true); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to refresh cache: %v\n", err)
fmt.Fprintf(os.Stderr, "Run 'msgvault build-cache --full-rebuild' manually to update.\n")
} else if !result.Skipped {
fmt.Printf("Cache refreshed (%d messages).\n", result.ExportedCount)
}
}
// Refresh analytics cache to reflect deleted messages.
rebuildCacheAfterWrite(dbPath)

return nil
},
Expand Down
2 changes: 2 additions & 0 deletions cmd/msgvault/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func runWhatsAppImport(cmd *cobra.Command, sourcePath string) error {
if err != nil {
if ctx.Err() != nil {
fmt.Println("\nImport interrupted. Run again to continue.")
rebuildCacheAfterWrite(dbPath)
return nil
}
return fmt.Errorf("import failed: %w", err)
Expand Down Expand Up @@ -152,6 +153,7 @@ func runWhatsAppImport(cmd *cobra.Command, sourcePath string) error {
fmt.Printf(" Rate: %.0f messages/sec\n", rate)
}

rebuildCacheAfterWrite(dbPath)
return nil
}

Expand Down
10 changes: 7 additions & 3 deletions cmd/msgvault/cmd/import_emlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,17 @@ Examples:
attachmentsDir = ""
}

var importErr error
if identifier != "" {
// Manual fallback: single import with explicit identifier.
return importSingleAccount(ctx, cmd, st, mailDir, identifier, attachmentsDir)
importErr = importSingleAccount(ctx, cmd, st, mailDir, identifier, attachmentsDir)
} else {
// Auto mode: discover accounts from V10 layout + Accounts4.sqlite.
importErr = importAutoAccounts(ctx, cmd, st, mailDir, attachmentsDir)
}

// Auto mode: discover accounts from V10 layout + Accounts4.sqlite.
return importAutoAccounts(ctx, cmd, st, mailDir, attachmentsDir)
rebuildCacheAfterWrite(dbPath)
return importErr
},
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/msgvault/cmd/import_gvoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ func runImportGvoice(cmd *cobra.Command, args []string) error {
if ctx.Err() != nil {
fmt.Println("\nImport interrupted.")
printGvoiceSummary(summary, startTime)
rebuildCacheAfterWrite(cfg.DatabaseDSN())
return nil
}
return fmt.Errorf("import failed: %w", err)
}

printGvoiceSummary(summary, startTime)
rebuildCacheAfterWrite(cfg.DatabaseDSN())
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/msgvault/cmd/import_imessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ func runImportImessage(cmd *cobra.Command, _ []string) error {
if ctx.Err() != nil {
fmt.Println("\nImport interrupted.")
printImessageSummary(summary, startTime)
rebuildCacheAfterWrite(cfg.DatabaseDSN())
return nil
}
return fmt.Errorf("import failed: %w", err)
}

printImessageSummary(summary, startTime)
rebuildCacheAfterWrite(cfg.DatabaseDSN())
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/msgvault/cmd/import_mbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ Examples:
_, _ = fmt.Fprintf(out, " Errors: %d\n", totalErrors)
_, _ = fmt.Fprintf(out, " Bytes: %.2f MB\n", float64(totalBytes)/(1024*1024))

rebuildCacheAfterWrite(dbPath)

if ctx.Err() == nil && hadHardErrors {
return fmt.Errorf("import completed with %d errors", totalErrors)
}
Expand Down
16 changes: 14 additions & 2 deletions cmd/msgvault/cmd/repair_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,20 @@ charset detection issues in the MIME parser.`,
return fmt.Errorf("init schema: %w", err)
}

return repairEncoding(s)
if err := repairEncoding(s); err != nil {
return err
}

analyticsDir := cfg.AnalyticsDir()
if _, err := buildCache(dbPath, analyticsDir, true); err != nil {
fmt.Fprintf(os.Stderr,
"Warning: cache rebuild failed: %v\n", err)
fmt.Fprintf(os.Stderr,
"Run 'msgvault build-cache --full-rebuild' to retry.\n")
} else {
fmt.Println("\nAnalytics cache rebuilt.")
}
return nil
},
}

Expand Down Expand Up @@ -133,7 +146,6 @@ func repairEncoding(s *store.Store) error {
fmt.Printf(" Skipped rows: %d (scan errors)\n", stats.skippedRows)
}
fmt.Printf(" Total fields: %d\n", total)
fmt.Println("\nRun 'msgvault build-cache --full-rebuild' to update the analytics cache.")
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions cmd/msgvault/cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ Examples:
}
}

// Rebuild analytics cache.
rebuildCacheAfterWrite(dbPath)

if len(syncErrors) > 0 {
fmt.Println()
fmt.Println("Errors:")
Expand Down
3 changes: 3 additions & 0 deletions cmd/msgvault/cmd/syncfull.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ Examples:
}
}

// Rebuild analytics cache.
rebuildCacheAfterWrite(dbPath)

if len(syncErrors) > 0 {
fmt.Println()
fmt.Println("Errors:")
Expand Down
1 change: 1 addition & 0 deletions internal/sync/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (s *Syncer) Incremental(ctx context.Context, source *store.Source) (summary
} else {
for i, raw := range rawMessages {
if raw == nil {
s.logger.Warn("failed to fetch message (nil response)", "id", newMsgIDs[i])
checkpoint.ErrorsCount++
continue
}
Expand Down
1 change: 1 addition & 0 deletions internal/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (s *Syncer) processBatch(ctx context.Context, sourceID int64, listResp *gma

for i, raw := range rawMessages {
if raw == nil {
s.logger.Warn("failed to fetch message (nil response)", "id", newIDs[i])
checkpoint.ErrorsCount++
continue
}
Expand Down