Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee

fix: seeding fix

pdewey.com 1798130b 56d28489

verified
+97 -34
+13 -17
cmd/server/main.go
··· 130 130 log.Fatal().Err(err).Msg("Failed to initialize OAuth") 131 131 } 132 132 133 - // Initialize feed registry (in-memory; populated from SQLite after feedIndex opens) 134 - feedRegistry := feed.NewRegistry() 135 - feedService := feed.NewService(feedRegistry) 133 + // feedRegistry and feedService are initialised after feedIndex opens below 136 134 137 135 // Setup context for graceful shutdown 138 136 ctx, cancel := context.WithCancel(context.Background()) ··· 176 174 177 175 log.Info().Str("path", feedIndexPath).Msg("Feed index opened") 178 176 179 - // One-time seed: copy any DIDs from the legacy BoltDB feed_registry bucket into 180 - // SQLite known_dids. INSERT OR IGNORE makes this a no-op once DIDs are present. 177 + // One-time seed: copy DIDs from the legacy BoltDB feed_registry bucket into 178 + // SQLite registered_dids. INSERT OR IGNORE makes this a no-op after first run. 181 179 if legacyDIDs := store.LegacyFeedDIDs(); len(legacyDIDs) > 0 { 182 - added, err := feedIndex.SeedKnownDIDs(legacyDIDs) 180 + added, err := feedIndex.SeedRegisteredDIDs(legacyDIDs) 183 181 if err != nil { 184 - log.Warn().Err(err).Msg("Failed to seed known DIDs from legacy feed registry") 182 + log.Warn().Err(err).Msg("Failed to seed registered DIDs from legacy feed registry") 185 183 } else if added > 0 { 186 - log.Info().Int("seeded", added).Msg("Seeded known_dids from legacy BoltDB feed registry") 184 + log.Info().Int("seeded", added).Msg("Seeded registered_dids from legacy BoltDB feed registry") 187 185 } 188 186 } 189 187 190 - // Populate feed registry from SQLite known_dids (replaces BoltDB feed registry) 191 - if knownDIDs, err := feedIndex.GetKnownDIDs(); err == nil { 192 - for _, did := range knownDIDs { 193 - feedRegistry.Register(did) 194 - } 195 - log.Info().Int("registered_users", feedRegistry.Count()).Msg("Feed registry populated from index") 196 - } else { 197 - log.Warn().Err(err).Msg("Failed to load known DIDs into feed registry") 198 - } 188 + // Initialise feed registry backed by SQLite registered_dids. 189 + // This preserves the distinction between registered users (explicit logins) 190 + // and known_dids (any user whose records have been indexed via firehose). 191 + feedRegistry := feed.NewPersistentRegistry(feedIndex) 192 + feedService := feed.NewService(feedRegistry) 193 + 194 + log.Info().Int("registered_users", feedRegistry.Count()).Msg("Feed service initialised") 199 195 200 196 // Create and start consumer 201 197 firehoseConsumer := firehose.NewConsumer(firehoseConfig, feedIndex)
+4 -17
internal/firehose/index.go
··· 106 106 ); 107 107 108 108 CREATE TABLE IF NOT EXISTS known_dids (did TEXT PRIMARY KEY); 109 + CREATE TABLE IF NOT EXISTS registered_dids ( 110 + did TEXT PRIMARY KEY, 111 + registered_at TEXT NOT NULL 112 + ); 109 113 CREATE TABLE IF NOT EXISTS backfilled (did TEXT PRIMARY KEY, backfilled_at TEXT NOT NULL); 110 114 111 115 CREATE TABLE IF NOT EXISTS profiles ( ··· 245 249 return idx.db 246 250 } 247 251 248 - // SeedKnownDIDs inserts DIDs into known_dids, ignoring any that already exist. 249 - // Used for one-time migration from legacy storage. 250 - func (idx *FeedIndex) SeedKnownDIDs(dids []string) (int, error) { 251 - if len(dids) == 0 { 252 - return 0, nil 253 - } 254 - var added int 255 - for _, did := range dids { 256 - res, err := idx.db.Exec(`INSERT OR IGNORE INTO known_dids (did) VALUES (?)`, did) 257 - if err != nil { 258 - return added, fmt.Errorf("seed known DID %s: %w", did, err) 259 - } 260 - n, _ := res.RowsAffected() 261 - added += int(n) 262 - } 263 - return added, nil 264 - } 265 252 266 253 // Close closes the index database 267 254 func (idx *FeedIndex) Close() error {
+80
internal/firehose/registry.go
··· 1 + package firehose 2 + 3 + import ( 4 + "time" 5 + 6 + "arabica/internal/feed" 7 + ) 8 + 9 + // Ensure FeedIndex implements feed.PersistentStore at compile time. 10 + var _ feed.PersistentStore = (*FeedIndex)(nil) 11 + 12 + // Register adds a DID to the registered_dids table. 13 + // This records users who have explicitly logged into Arabica. 14 + func (idx *FeedIndex) Register(did string) error { 15 + _, err := idx.db.Exec( 16 + `INSERT OR IGNORE INTO registered_dids (did, registered_at) VALUES (?, ?)`, 17 + did, time.Now().Format(time.RFC3339), 18 + ) 19 + return err 20 + } 21 + 22 + // Unregister removes a DID from the registered_dids table. 23 + func (idx *FeedIndex) Unregister(did string) error { 24 + _, err := idx.db.Exec(`DELETE FROM registered_dids WHERE did = ?`, did) 25 + return err 26 + } 27 + 28 + // IsRegistered reports whether a DID is in the registered_dids table. 29 + func (idx *FeedIndex) IsRegistered(did string) bool { 30 + var exists int 31 + _ = idx.db.QueryRow(`SELECT 1 FROM registered_dids WHERE did = ?`, did).Scan(&exists) 32 + return exists == 1 33 + } 34 + 35 + // List returns all registered DIDs. 36 + func (idx *FeedIndex) List() []string { 37 + rows, err := idx.db.Query(`SELECT did FROM registered_dids`) 38 + if err != nil { 39 + return nil 40 + } 41 + defer rows.Close() 42 + 43 + var dids []string 44 + for rows.Next() { 45 + var did string 46 + if err := rows.Scan(&did); err != nil { 47 + continue 48 + } 49 + dids = append(dids, did) 50 + } 51 + return dids 52 + } 53 + 54 + // Count returns the number of registered users. 55 + func (idx *FeedIndex) Count() int { 56 + var count int 57 + _ = idx.db.QueryRow(`SELECT COUNT(*) FROM registered_dids`).Scan(&count) 58 + return count 59 + } 60 + 61 + // SeedRegisteredDIDs inserts DIDs into registered_dids, ignoring duplicates. 62 + // Used for one-time migration from the legacy BoltDB feed_registry. 63 + func (idx *FeedIndex) SeedRegisteredDIDs(dids []string) (int, error) { 64 + if len(dids) == 0 { 65 + return 0, nil 66 + } 67 + var added int 68 + for _, did := range dids { 69 + res, err := idx.db.Exec( 70 + `INSERT OR IGNORE INTO registered_dids (did, registered_at) VALUES (?, ?)`, 71 + did, time.Now().Format(time.RFC3339), 72 + ) 73 + if err != nil { 74 + return added, err 75 + } 76 + n, _ := res.RowsAffected() 77 + added += int(n) 78 + } 79 + return added, nil 80 + }