Subscribe and post RSS feeds to Bluesky
rss bluesky

feat: add per-feed HTTP/backoff options and retries

+580 -114
+49 -13
config.example.yaml
··· 5 5 accounts: 6 6 # First account - posts from multiple feeds 7 7 - handle: "user1.bsky.social" 8 - # Password can be plain text (not recommended) or environment variable reference 9 - # Use ${VAR_NAME} or $VAR_NAME format for environment variables 8 + # Password can be plain text or an environment variable reference. 9 + # Use ${VAR_NAME} or $VAR_NAME format for environment variables. 10 10 password: "${BSKY_PASSWORD_USER1}" 11 11 # Optional: Custom PDS server (defaults to https://bsky.social) 12 12 pds: "https://bsky.social" 13 - # Optional: Custom storage file for this account (defaults to global storage with account suffix) 13 + # Optional: Custom storage file for this account 14 14 # storage: "user1_posted.txt" 15 15 feeds: 16 + # Feeds can be plain URLs ... 16 17 - "https://example.com/feed.xml" 17 18 - "https://blog.example.com/rss" 18 - - "https://news.example.com/atom.xml" 19 + # ... or mappings with per-feed HTTP/pacing overrides. 20 + # Any field from the global "defaults" block can be overridden here. 21 + - url: "https://old.reddit.com/r/linux/top/.rss?t=week" 22 + # Stagger requests to avoid bursting Reddit's rate limits. 23 + min_delay: "3s" 24 + max_delay: "9s" 25 + base_backoff: "30s" 26 + max_backoff: "10m" 27 + honor_retry_after: true 19 28 20 - # Second account - posts from different feeds 29 + # Second account 21 30 - handle: "user2.bsky.social" 22 31 password: "${BSKY_PASSWORD_USER2}" 23 32 feeds: 24 33 - "https://another-blog.com/feed.xml" 25 - - "https://tech-news.com/rss" 34 + - url: "https://old.reddit.com/r/selfhosted/top/.rss?t=week" 35 + min_delay: "3s" 36 + max_delay: "9s" 37 + base_backoff: "30s" 38 + max_backoff: "10m" 39 + honor_retry_after: true 26 40 27 - # Third account - posts from a single feed 41 + # Third account - single feed, no overrides needed 28 42 - handle: "user3.bsky.social" 29 43 password: "${BSKY_PASSWORD_USER3}" 30 44 pds: "https://custom-pds.example.com" 31 45 feeds: 32 46 - "https://personal-blog.com/feed.xml" 33 47 34 - # Global settings (optional) 35 - # Poll interval for checking feeds (default: 15m) 36 - # Valid units: s, m, h (e.g., "30s", "5m", "1h") 48 + # Poll interval (default: 15m). Valid units: s, m, h 37 49 interval: "15m" 38 50 39 - # Default storage file for tracking posted items (default: posted_items.txt) 40 - # When multiple accounts are configured, separate files will be created automatically 41 - # (e.g., posted_items_user1_bsky_social.txt, posted_items_user2_bsky_social.txt) 51 + # Storage file for tracking posted items (default: posted_items.txt). 52 + # With multiple accounts, per-account files are created automatically 53 + # (e.g. posted_items_user1_bsky_social.txt). 42 54 storage: "posted_items.txt" 55 + 56 + # Global feed defaults - applied to every feed unless overridden per-feed above. 57 + # All fields are optional; hard-coded fallbacks are used when omitted. 58 + defaults: 59 + # A descriptive User-Agent reduces rate-limiting on providers like Reddit. 60 + user_agent: "bskyrss/1.0 (+https://pkg.rbrt.fr/bskyrss)" 61 + 62 + # Per-request HTTP timeout. 63 + timeout: "30s" 64 + 65 + # Retry settings for transient HTTP errors (429, 5xx ...). 66 + # Retries use exponential backoff: base_backoff * 2^attempt, capped at max_backoff. 67 + max_retries: 3 68 + base_backoff: "2s" 69 + max_backoff: "2m" 70 + 71 + # Honour the Retry-After response header when the server sends it (e.g. Reddit 429). 72 + honor_retry_after: true 73 + 74 + # Pre-fetch jitter: a random delay in [min_delay, max_delay] added before each 75 + # request. Leave both unset (or 0s) to disable. Prefer setting this per-feed 76 + # rather than globally so only rate-sensitive feeds are affected. 77 + # min_delay: "0s" 78 + # max_delay: "0s"
+1 -1
go.mod
··· 1 1 module pkg.rbrt.fr/bskyrss 2 2 3 - go 1.25.4 3 + go 1.26.0 4 4 5 5 require ( 6 6 github.com/bluesky-social/indigo v0.0.0-20260103083015-78a1c1894f36
+128 -17
internal/config/config.go
··· 14 14 Accounts []Account `yaml:"accounts"` 15 15 Interval time.Duration `yaml:"interval,omitempty"` 16 16 Storage string `yaml:"storage,omitempty"` 17 + Defaults FeedOptions `yaml:"defaults,omitempty"` 17 18 } 18 19 19 20 // Account represents a Bluesky account with its associated feeds 20 21 type Account struct { 21 - Handle string `yaml:"handle"` 22 - Password string `yaml:"password,omitempty"` 23 - PDS string `yaml:"pds,omitempty"` 24 - Feeds []string `yaml:"feeds"` 25 - Storage string `yaml:"storage,omitempty"` // Optional per-account storage file 22 + Handle string `yaml:"handle"` 23 + Password string `yaml:"password,omitempty"` 24 + PDS string `yaml:"pds,omitempty"` 25 + Feeds []FeedConfig `yaml:"feeds"` 26 + Storage string `yaml:"storage,omitempty"` 27 + } 28 + 29 + // FeedConfig holds a feed URL and optional per-feed HTTP/pacing options. 30 + // In YAML it can be written as a plain string (just the URL) or as a mapping: 31 + // 32 + // feeds: 33 + // - "https://example.com/rss" 34 + // - url: "https://old.reddit.com/r/linux/top/.rss?t=week" 35 + // pre_fetch_delay: "5s" 36 + // base_backoff: "30s" 37 + type FeedConfig struct { 38 + URL string `yaml:"url"` 39 + Options FeedOptions `yaml:",inline"` 40 + } 41 + 42 + // UnmarshalYAML allows FeedConfig to be written as a plain string URL in YAML. 43 + func (f *FeedConfig) UnmarshalYAML(value *yaml.Node) error { 44 + if value.Kind == yaml.ScalarNode { 45 + f.URL = value.Value 46 + return nil 47 + } 48 + 49 + // normal struct decoding 50 + type feedConfigAlias FeedConfig 51 + var alias feedConfigAlias 52 + if err := value.Decode(&alias); err != nil { 53 + return err 54 + } 55 + *f = FeedConfig(alias) 56 + return nil 57 + } 58 + 59 + // FeedOptions controls HTTP behaviour, pacing and retry for a feed. 60 + type FeedOptions struct { 61 + UserAgent string `yaml:"user_agent,omitempty"` 62 + Timeout time.Duration `yaml:"timeout,omitempty"` 63 + 64 + // Pre-fetch jitter. 65 + MinDelay time.Duration `yaml:"min_delay,omitempty"` 66 + MaxDelay time.Duration `yaml:"max_delay,omitempty"` 67 + 68 + // Retry / backoff on transient HTTP errors (5xx, 429 …) 69 + MaxRetries *int `yaml:"max_retries,omitempty"` 70 + BaseBackoff time.Duration `yaml:"base_backoff,omitempty"` 71 + MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` 72 + 73 + // When true (default), honour the Retry-After response header on HTTP 429. 74 + HonorRetryAfter *bool `yaml:"honor_retry_after,omitempty"` 75 + } 76 + 77 + // Resolved returns the effective FeedOptions for feed, with per-feed values 78 + // taking priority over the global defaults block, which in turn takes priority 79 + // over the hard-coded baseline. 80 + func (c *Config) Resolved(feed FeedConfig) FeedOptions { 81 + f := feed.Options 82 + g := c.Defaults 83 + 84 + return FeedOptions{ 85 + UserAgent: coalesce(f.UserAgent, g.UserAgent, "bskyrss/1.0 (+https://pkg.rbrt.fr/bskyrss)"), 86 + Timeout: coalesce(f.Timeout, g.Timeout, 30*time.Second), 87 + MinDelay: coalesce(f.MinDelay, g.MinDelay, 0), 88 + MaxDelay: coalesce(f.MaxDelay, g.MaxDelay, 0), 89 + MaxRetries: coalesce(f.MaxRetries, g.MaxRetries, new(3)), 90 + BaseBackoff: coalesce(f.BaseBackoff, g.BaseBackoff, 2*time.Second), 91 + MaxBackoff: coalesce(f.MaxBackoff, g.MaxBackoff, 2*time.Minute), 92 + HonorRetryAfter: coalesce(f.HonorRetryAfter, g.HonorRetryAfter, new(true)), 93 + } 26 94 } 27 95 28 - // LoadFromFile loads configuration from a YAML file 96 + // coalesce returns the first value that is not the zero value of T. 97 + func coalesce[T comparable](vals ...T) T { 98 + var zero T 99 + for _, v := range vals { 100 + if v != zero { 101 + return v 102 + } 103 + } 104 + return zero 105 + } 106 + 107 + // LoadFromFile loads configuration from a YAML file. 29 108 func LoadFromFile(path string) (*Config, error) { 30 109 data, err := os.ReadFile(path) 31 110 if err != nil { ··· 41 120 return nil, fmt.Errorf("invalid configuration: %w", err) 42 121 } 43 122 44 - // Process environment variables in passwords 123 + // Expand environment variables in passwords 45 124 for i := range cfg.Accounts { 46 125 cfg.Accounts[i].Password = expandEnvVar(cfg.Accounts[i].Password) 47 126 } 48 127 49 - // Set defaults 128 + // Global defaults 50 129 if cfg.Interval == 0 { 51 130 cfg.Interval = 15 * time.Minute 52 131 } ··· 57 136 return &cfg, nil 58 137 } 59 138 60 - // Validate checks if the configuration is valid 139 + // Validate checks the configuration for obvious errors. 61 140 func (c *Config) Validate() error { 62 141 if len(c.Accounts) == 0 { 63 142 return fmt.Errorf("at least one account is required") ··· 66 145 for i, account := range c.Accounts { 67 146 if account.Handle == "" { 68 147 return fmt.Errorf("account %d: handle is required", i) 148 + } 149 + if account.Password == "" { 150 + return fmt.Errorf("account %d (%s): password is required", i, account.Handle) 69 151 } 70 152 if len(account.Feeds) == 0 { 71 153 return fmt.Errorf("account %d (%s): at least one feed is required", i, account.Handle) 72 154 } 73 - if account.Password == "" { 74 - return fmt.Errorf("account %d (%s): password is required", i, account.Handle) 155 + for j, feed := range account.Feeds { 156 + if feed.URL == "" { 157 + return fmt.Errorf("account %d (%s), feed %d: url is required", i, account.Handle, j) 158 + } 75 159 } 76 160 } 77 161 162 + if err := validateFeedOptions("defaults", c.Defaults); err != nil { 163 + return err 164 + } 165 + 78 166 return nil 79 167 } 80 168 81 - // expandEnvVar expands environment variable references in the format ${VAR_NAME} or $VAR_NAME 169 + func validateFeedOptions(prefix string, o FeedOptions) error { 170 + if o.Timeout < 0 { 171 + return fmt.Errorf("%s.timeout must be >= 0", prefix) 172 + } 173 + if o.MinDelay < 0 { 174 + return fmt.Errorf("%s.min_delay must be >= 0", prefix) 175 + } 176 + if o.MaxDelay < 0 { 177 + return fmt.Errorf("%s.max_delay must be >= 0", prefix) 178 + } 179 + if o.MinDelay > 0 && o.MaxDelay > 0 && o.MinDelay > o.MaxDelay { 180 + return fmt.Errorf("%s.min_delay must be <= max_delay", prefix) 181 + } 182 + if o.BaseBackoff < 0 { 183 + return fmt.Errorf("%s.base_backoff must be >= 0", prefix) 184 + } 185 + if o.MaxBackoff < 0 { 186 + return fmt.Errorf("%s.max_backoff must be >= 0", prefix) 187 + } 188 + if o.BaseBackoff > 0 && o.MaxBackoff > 0 && o.BaseBackoff > o.MaxBackoff { 189 + return fmt.Errorf("%s.base_backoff must be <= max_backoff", prefix) 190 + } 191 + if o.MaxRetries != nil && *o.MaxRetries < 0 { 192 + return fmt.Errorf("%s.max_retries must be >= 0", prefix) 193 + } 194 + return nil 195 + } 196 + 197 + // expandEnvVar expands ${VAR} or $VAR references. 82 198 func expandEnvVar(value string) string { 83 199 if value == "" { 84 200 return value 85 201 } 86 - 87 - // Handle ${VAR_NAME} format 88 202 if strings.HasPrefix(value, "${") && strings.HasSuffix(value, "}") { 89 203 varName := value[2 : len(value)-1] 90 204 if envVal := os.Getenv(varName); envVal != "" { ··· 92 206 } 93 207 return value 94 208 } 95 - 96 - // Handle $VAR_NAME format 97 209 if strings.HasPrefix(value, "$") { 98 210 varName := value[1:] 99 211 if envVal := os.Getenv(varName); envVal != "" { ··· 101 213 } 102 214 return value 103 215 } 104 - 105 216 return value 106 217 }
+162 -9
internal/config/config_test.go
··· 8 8 ) 9 9 10 10 func TestLoadFromFile(t *testing.T) { 11 - // Create a temporary config file 12 11 tmpDir := t.TempDir() 13 12 configPath := filepath.Join(tmpDir, "config.yaml") 14 13 ··· 38 37 t.Fatalf("LoadFromFile failed: %v", err) 39 38 } 40 39 41 - // Verify accounts 42 40 if len(cfg.Accounts) != 2 { 43 41 t.Errorf("Expected 2 accounts, got %d", len(cfg.Accounts)) 44 42 } 45 43 46 - // Verify first account 47 44 if cfg.Accounts[0].Handle != "user1.bsky.social" { 48 45 t.Errorf("Expected handle 'user1.bsky.social', got '%s'", cfg.Accounts[0].Handle) 49 46 } ··· 53 50 if len(cfg.Accounts[0].Feeds) != 2 { 54 51 t.Errorf("Expected 2 feeds for account 1, got %d", len(cfg.Accounts[0].Feeds)) 55 52 } 53 + if cfg.Accounts[0].Feeds[0].URL != "https://feed1.com/rss" { 54 + t.Errorf("Expected feed URL 'https://feed1.com/rss', got '%s'", cfg.Accounts[0].Feeds[0].URL) 55 + } 56 + if cfg.Accounts[0].Feeds[1].URL != "https://feed2.com/atom" { 57 + t.Errorf("Expected feed URL 'https://feed2.com/atom', got '%s'", cfg.Accounts[0].Feeds[1].URL) 58 + } 56 59 57 - // Verify second account 58 60 if cfg.Accounts[1].Handle != "user2.bsky.social" { 59 61 t.Errorf("Expected handle 'user2.bsky.social', got '%s'", cfg.Accounts[1].Handle) 60 62 } ··· 62 64 t.Errorf("Expected 1 feed for account 2, got %d", len(cfg.Accounts[1].Feeds)) 63 65 } 64 66 65 - // Verify global settings 66 67 if cfg.Interval != 10*time.Minute { 67 68 t.Errorf("Expected interval 10m, got %v", cfg.Interval) 68 69 } ··· 71 72 } 72 73 } 73 74 75 + func TestLoadFromFile_FeedConfigMapping(t *testing.T) { 76 + tmpDir := t.TempDir() 77 + configPath := filepath.Join(tmpDir, "config.yaml") 78 + 79 + configContent := ` 80 + accounts: 81 + - handle: "user1.bsky.social" 82 + password: "password1" 83 + feeds: 84 + - "https://plain-url.com/rss" 85 + - url: "https://mapping-url.com/rss" 86 + user_agent: "custom-agent/1.0" 87 + min_delay: "2s" 88 + max_delay: "6s" 89 + base_backoff: "10s" 90 + max_backoff: "5m" 91 + honor_retry_after: false 92 + ` 93 + 94 + if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil { 95 + t.Fatalf("Failed to write config file: %v", err) 96 + } 97 + 98 + cfg, err := LoadFromFile(configPath) 99 + if err != nil { 100 + t.Fatalf("LoadFromFile failed: %v", err) 101 + } 102 + 103 + feeds := cfg.Accounts[0].Feeds 104 + if len(feeds) != 2 { 105 + t.Fatalf("Expected 2 feeds, got %d", len(feeds)) 106 + } 107 + 108 + // Plain string feed 109 + if feeds[0].URL != "https://plain-url.com/rss" { 110 + t.Errorf("Expected URL 'https://plain-url.com/rss', got '%s'", feeds[0].URL) 111 + } 112 + if feeds[0].Options.UserAgent != "" { 113 + t.Errorf("Expected empty UserAgent for plain feed, got '%s'", feeds[0].Options.UserAgent) 114 + } 115 + 116 + // Mapping feed with overrides 117 + if feeds[1].URL != "https://mapping-url.com/rss" { 118 + t.Errorf("Expected URL 'https://mapping-url.com/rss', got '%s'", feeds[1].URL) 119 + } 120 + if feeds[1].Options.UserAgent != "custom-agent/1.0" { 121 + t.Errorf("Expected UserAgent 'custom-agent/1.0', got '%s'", feeds[1].Options.UserAgent) 122 + } 123 + if feeds[1].Options.MinDelay != 2*time.Second { 124 + t.Errorf("Expected MinDelay 2s, got %v", feeds[1].Options.MinDelay) 125 + } 126 + if feeds[1].Options.MaxDelay != 6*time.Second { 127 + t.Errorf("Expected MaxDelay 6s, got %v", feeds[1].Options.MaxDelay) 128 + } 129 + if feeds[1].Options.BaseBackoff != 10*time.Second { 130 + t.Errorf("Expected BaseBackoff 10s, got %v", feeds[1].Options.BaseBackoff) 131 + } 132 + if feeds[1].Options.MaxBackoff != 5*time.Minute { 133 + t.Errorf("Expected MaxBackoff 5m, got %v", feeds[1].Options.MaxBackoff) 134 + } 135 + if feeds[1].Options.HonorRetryAfter == nil || *feeds[1].Options.HonorRetryAfter != false { 136 + t.Errorf("Expected HonorRetryAfter false, got %v", feeds[1].Options.HonorRetryAfter) 137 + } 138 + } 139 + 140 + func TestLoadFromFile_Resolved(t *testing.T) { 141 + tmpDir := t.TempDir() 142 + configPath := filepath.Join(tmpDir, "config.yaml") 143 + 144 + configContent := ` 145 + accounts: 146 + - handle: "user1.bsky.social" 147 + password: "password1" 148 + feeds: 149 + - "https://feed1.com/rss" 150 + - url: "https://feed2.com/rss" 151 + user_agent: "per-feed-agent/1.0" 152 + 153 + defaults: 154 + user_agent: "global-agent/1.0" 155 + min_delay: "1s" 156 + max_delay: "4s" 157 + ` 158 + 159 + if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil { 160 + t.Fatalf("Failed to write config file: %v", err) 161 + } 162 + 163 + cfg, err := LoadFromFile(configPath) 164 + if err != nil { 165 + t.Fatalf("LoadFromFile failed: %v", err) 166 + } 167 + 168 + // Plain feed: should inherit global defaults 169 + plain := cfg.Resolved(cfg.Accounts[0].Feeds[0]) 170 + if plain.UserAgent != "global-agent/1.0" { 171 + t.Errorf("Expected global UserAgent for plain feed, got '%s'", plain.UserAgent) 172 + } 173 + if plain.MinDelay != 1*time.Second { 174 + t.Errorf("Expected MinDelay 1s from global defaults, got %v", plain.MinDelay) 175 + } 176 + // Timeout should fall back to hard-coded default 177 + if plain.Timeout != 30*time.Second { 178 + t.Errorf("Expected default Timeout 30s, got %v", plain.Timeout) 179 + } 180 + 181 + // Per-feed override wins over global default 182 + overridden := cfg.Resolved(cfg.Accounts[0].Feeds[1]) 183 + if overridden.UserAgent != "per-feed-agent/1.0" { 184 + t.Errorf("Expected per-feed UserAgent, got '%s'", overridden.UserAgent) 185 + } 186 + // Global delay still inherited 187 + if overridden.MinDelay != 1*time.Second { 188 + t.Errorf("Expected MinDelay 1s inherited from global, got %v", overridden.MinDelay) 189 + } 190 + } 191 + 74 192 func TestLoadFromFileWithEnvVars(t *testing.T) { 75 - // Set environment variable 76 193 os.Setenv("TEST_PASSWORD", "env-password") 77 194 defer os.Unsetenv("TEST_PASSWORD") 78 195 ··· 105 222 tmpDir := t.TempDir() 106 223 configPath := filepath.Join(tmpDir, "config.yaml") 107 224 108 - // Minimal config with only required fields 109 225 configContent := ` 110 226 accounts: 111 227 - handle: "user1.bsky.social" ··· 123 239 t.Fatalf("LoadFromFile failed: %v", err) 124 240 } 125 241 126 - // Check defaults 127 242 if cfg.Interval != 15*time.Minute { 128 243 t.Errorf("Expected default interval 15m, got %v", cfg.Interval) 129 244 } 130 245 if cfg.Storage != "posted_items.txt" { 131 246 t.Errorf("Expected default storage 'posted_items.txt', got '%s'", cfg.Storage) 132 247 } 133 - 134 248 } 135 249 136 250 func TestLoadFromFileInvalid(t *testing.T) { ··· 173 287 - handle: "user1.bsky.social" 174 288 password: "password1" 175 289 feeds: [] 290 + `, 291 + wantErr: true, 292 + }, 293 + { 294 + name: "feed with empty url in mapping", 295 + content: ` 296 + accounts: 297 + - handle: "user1.bsky.social" 298 + password: "password1" 299 + feeds: 300 + - url: "" 301 + `, 302 + wantErr: true, 303 + }, 304 + { 305 + name: "defaults min_delay > max_delay", 306 + content: ` 307 + accounts: 308 + - handle: "user1.bsky.social" 309 + password: "password1" 310 + feeds: 311 + - "https://feed1.com/rss" 312 + defaults: 313 + min_delay: "10s" 314 + max_delay: "5s" 315 + `, 316 + wantErr: true, 317 + }, 318 + { 319 + name: "defaults base_backoff > max_backoff", 320 + content: ` 321 + accounts: 322 + - handle: "user1.bsky.social" 323 + password: "password1" 324 + feeds: 325 + - "https://feed1.com/rss" 326 + defaults: 327 + base_backoff: "10m" 328 + max_backoff: "1m" 176 329 `, 177 330 wantErr: true, 178 331 },
+128 -16
internal/rss/feed.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 5 6 "fmt" 7 + "math/rand" 8 + "net/http" 9 + "strconv" 10 + "strings" 6 11 "time" 7 12 8 13 "github.com/mmcdole/gofeed" 14 + "pkg.rbrt.fr/bskyrss/internal/config" 9 15 ) 10 16 11 - // FeedItem represents a single RSS feed item 17 + // FeedItem represents a single RSS feed item. 12 18 type FeedItem struct { 13 19 GUID string 14 20 Title string ··· 17 23 Published time.Time 18 24 } 19 25 20 - // Checker handles RSS feed checking operations 21 - type Checker struct { 22 - parser *gofeed.Parser 26 + // retryableStatuses are HTTP status codes that warrant a retry with backoff. 27 + var retryableStatuses = map[int]struct{}{ 28 + http.StatusTooManyRequests: {}, 29 + http.StatusInternalServerError: {}, 30 + http.StatusBadGateway: {}, 31 + http.StatusServiceUnavailable: {}, 32 + http.StatusGatewayTimeout: {}, 33 + http.StatusRequestTimeout: {}, 23 34 } 24 35 25 - // NewChecker creates a new RSS feed checker 26 - func NewChecker() *Checker { 27 - return &Checker{ 28 - parser: gofeed.NewParser(), 36 + const maxJitter = 750 * time.Millisecond 37 + 38 + // capturingTransport stores the last response so we can read Retry-After, 39 + // which gofeed.HTTPError does not expose. 40 + type capturingTransport struct { 41 + inner http.RoundTripper 42 + lastResp *http.Response 43 + } 44 + 45 + func (t *capturingTransport) RoundTrip(req *http.Request) (*http.Response, error) { 46 + resp, err := t.inner.RoundTrip(req) 47 + if resp != nil { 48 + t.lastResp = resp 29 49 } 50 + return resp, err 30 51 } 31 52 32 - // FetchLatestItems fetches the latest items from an RSS feed 33 - func (c *Checker) FetchLatestItems(ctx context.Context, feedURL string) ([]*FeedItem, error) { 34 - feed, err := c.parser.ParseURLWithContext(feedURL, ctx) 35 - if err != nil { 36 - return nil, fmt.Errorf("failed to parse RSS feed: %w", err) 53 + // Checker fetches RSS feeds. It is stateless; all behaviour is driven by the 54 + // FeedOptions passed to FetchLatestItems. 55 + type Checker struct{} 56 + 57 + func NewChecker() *Checker { return &Checker{} } 58 + 59 + // FetchLatestItems fetches items from feedURL using opts. 60 + // opts is expected to be fully resolved (via config.Config.Resolved). 61 + func (c *Checker) FetchLatestItems(ctx context.Context, feedURL string, opts config.FeedOptions) ([]*FeedItem, error) { 62 + if opts.MaxDelay > 0 { 63 + lo := opts.MinDelay 64 + if lo > opts.MaxDelay { 65 + lo = 0 66 + } 67 + select { 68 + case <-ctx.Done(): 69 + return nil, fmt.Errorf("context cancelled before fetch: %w", ctx.Err()) 70 + case <-time.After(lo + randomDuration(opts.MaxDelay-lo)): 71 + } 72 + } 73 + 74 + transport := &capturingTransport{inner: http.DefaultTransport} 75 + parser := gofeed.NewParser() 76 + parser.Client = &http.Client{Timeout: opts.Timeout, Transport: transport} 77 + parser.UserAgent = opts.UserAgent 78 + 79 + honorRetryAfter := opts.HonorRetryAfter != nil && *opts.HonorRetryAfter 80 + maxRetries := 0 81 + if opts.MaxRetries != nil { 82 + maxRetries = *opts.MaxRetries 37 83 } 38 84 85 + var lastErr error 86 + for attempt := 0; attempt <= maxRetries; attempt++ { 87 + feed, err := parser.ParseURLWithContext(feedURL, ctx) 88 + if err == nil { 89 + return itemsFromFeed(feed), nil 90 + } 91 + lastErr = err 92 + if attempt == maxRetries { 93 + break 94 + } 95 + wait, shouldRetry := retryWait(err, attempt, opts.BaseBackoff, opts.MaxBackoff, honorRetryAfter, transport.lastResp) 96 + if !shouldRetry { 97 + break 98 + } 99 + select { 100 + case <-ctx.Done(): 101 + return nil, fmt.Errorf("context cancelled during retry: %w", ctx.Err()) 102 + case <-time.After(wait): 103 + } 104 + } 105 + 106 + return nil, fmt.Errorf("failed to parse RSS feed: %w", lastErr) 107 + } 108 + 109 + func itemsFromFeed(feed *gofeed.Feed) []*FeedItem { 39 110 if feed == nil || len(feed.Items) == 0 { 40 - return []*FeedItem{}, nil 111 + return []*FeedItem{} 41 112 } 42 113 43 - // Limit the number of items to return 44 114 items := make([]*FeedItem, 0, len(feed.Items)) 45 115 for _, item := range feed.Items { 46 116 published := time.Now() ··· 65 135 }) 66 136 } 67 137 68 - return items, nil 138 + return items 139 + } 140 + 141 + // retryWait returns the wait duration and whether the request should be retried. 142 + func retryWait(err error, attempt int, base, max time.Duration, honorRetryAfter bool, lastResp *http.Response) (time.Duration, bool) { 143 + var httpErr gofeed.HTTPError 144 + if !errors.As(err, &httpErr) { 145 + return 0, false 146 + } 147 + if _, ok := retryableStatuses[httpErr.StatusCode]; !ok { 148 + return 0, false 149 + } 150 + if honorRetryAfter && httpErr.StatusCode == http.StatusTooManyRequests { 151 + if d, ok := parseRetryAfter(lastResp, max); ok { 152 + return d + randomDuration(maxJitter), true 153 + } 154 + } 155 + return min(base*time.Duration(1<<attempt), max) + randomDuration(maxJitter), true 156 + } 157 + 158 + // parseRetryAfter parses the Retry-After header, capped at cap. 159 + func parseRetryAfter(resp *http.Response, cap time.Duration) (time.Duration, bool) { 160 + if resp == nil { 161 + return 0, false 162 + } 163 + raw := strings.TrimSpace(resp.Header.Get("Retry-After")) 164 + if raw == "" { 165 + return 0, false 166 + } 167 + if secs, err := strconv.Atoi(raw); err == nil && secs >= 0 { 168 + return min(time.Duration(secs)*time.Second, cap), true 169 + } 170 + if t, err := http.ParseTime(raw); err == nil { 171 + return min(max(time.Until(t), 0), cap), true 172 + } 173 + return 0, false 174 + } 175 + 176 + func randomDuration(max time.Duration) time.Duration { 177 + if max <= 0 { 178 + return 0 179 + } 180 + return time.Duration(rand.Int63n(int64(max) + 1)) 69 181 }
+83 -17
internal/rss/feed_test.go
··· 6 6 "net/http/httptest" 7 7 "testing" 8 8 "time" 9 + 10 + "pkg.rbrt.fr/bskyrss/internal/config" 9 11 ) 10 12 13 + var defaultOpts = config.FeedOptions{} 14 + 11 15 func TestFetchLatestItems(t *testing.T) { 12 - // Create a test RSS feed 13 16 testFeed := `<?xml version="1.0" encoding="UTF-8"?> 14 17 <rss version="2.0"> 15 18 <channel> ··· 40 43 </channel> 41 44 </rss>` 42 45 43 - // Create a test server 44 46 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 45 47 w.Header().Set("Content-Type", "application/rss+xml") 46 48 w.WriteHeader(http.StatusOK) ··· 48 50 })) 49 51 defer server.Close() 50 52 51 - // Create checker 52 53 checker := NewChecker() 53 54 54 - // Test fetching all items 55 55 t.Run("FetchAllItems", func(t *testing.T) { 56 - items, err := checker.FetchLatestItems(context.Background(), server.URL) 56 + items, err := checker.FetchLatestItems(context.Background(), server.URL, defaultOpts) 57 57 if err != nil { 58 58 t.Fatalf("Failed to fetch items: %v", err) 59 59 } ··· 62 62 t.Errorf("Expected 3 items, got %d", len(items)) 63 63 } 64 64 65 - // Check first item 66 65 if items[0].Title != "Test Item 1" { 67 66 t.Errorf("Expected title 'Test Item 1', got '%s'", items[0].Title) 68 67 } ··· 74 73 } 75 74 }) 76 75 77 - // Test with context timeout 78 76 t.Run("ContextTimeout", func(t *testing.T) { 79 77 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) 80 78 defer cancel() 81 79 82 - time.Sleep(2 * time.Millisecond) // Ensure context is expired 80 + time.Sleep(2 * time.Millisecond) 83 81 84 - _, err := checker.FetchLatestItems(ctx, server.URL) 82 + _, err := checker.FetchLatestItems(ctx, server.URL, defaultOpts) 85 83 if err == nil { 86 84 t.Error("Expected error with expired context, got nil") 87 85 } 88 86 }) 87 + 88 + t.Run("CustomUserAgent", func(t *testing.T) { 89 + var gotUA string 90 + uaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 91 + gotUA = r.Header.Get("User-Agent") 92 + w.Header().Set("Content-Type", "application/rss+xml") 93 + w.WriteHeader(http.StatusOK) 94 + w.Write([]byte(testFeed)) 95 + })) 96 + defer uaServer.Close() 97 + 98 + opts := config.FeedOptions{UserAgent: "my-custom-agent/2.0"} 99 + _, err := checker.FetchLatestItems(context.Background(), uaServer.URL, opts) 100 + if err != nil { 101 + t.Fatalf("Unexpected error: %v", err) 102 + } 103 + if gotUA != "my-custom-agent/2.0" { 104 + t.Errorf("Expected User-Agent 'my-custom-agent/2.0', got '%s'", gotUA) 105 + } 106 + }) 89 107 } 90 108 91 109 func TestFetchLatestItems_InvalidURL(t *testing.T) { 92 110 checker := NewChecker() 93 111 94 - _, err := checker.FetchLatestItems(context.Background(), "not-a-valid-url") 112 + _, err := checker.FetchLatestItems(context.Background(), "not-a-valid-url", defaultOpts) 95 113 if err == nil { 96 114 t.Error("Expected error with invalid URL, got nil") 97 115 } ··· 115 133 defer server.Close() 116 134 117 135 checker := NewChecker() 118 - items, err := checker.FetchLatestItems(context.Background(), server.URL) 136 + items, err := checker.FetchLatestItems(context.Background(), server.URL, defaultOpts) 119 137 120 138 if err != nil { 121 139 t.Fatalf("Expected no error with empty feed, got: %v", err) 122 140 } 123 - 124 141 if len(items) != 0 { 125 142 t.Errorf("Expected 0 items, got %d", len(items)) 126 143 } 127 144 } 128 145 129 146 func TestFeedItem_GUIDFallback(t *testing.T) { 130 - // Feed with no GUID, should use link instead 131 147 feedNoGUID := `<?xml version="1.0" encoding="UTF-8"?> 132 148 <rss version="2.0"> 133 149 <channel> ··· 151 167 defer server.Close() 152 168 153 169 checker := NewChecker() 154 - items, err := checker.FetchLatestItems(context.Background(), server.URL) 170 + items, err := checker.FetchLatestItems(context.Background(), server.URL, defaultOpts) 155 171 156 172 if err != nil { 157 173 t.Fatalf("Failed to fetch items: %v", err) 158 174 } 159 - 160 175 if len(items) != 1 { 161 176 t.Fatalf("Expected 1 item, got %d", len(items)) 162 177 } 163 - 164 - // GUID should fall back to link 165 178 if items[0].GUID != "https://example.com/item-no-guid" { 166 179 t.Errorf("Expected GUID to fallback to link, got '%s'", items[0].GUID) 167 180 } 168 181 } 182 + 183 + func TestFetchLatestItems_RetryOn429(t *testing.T) { 184 + attempts := 0 185 + feed := `<?xml version="1.0" encoding="UTF-8"?> 186 + <rss version="2.0"> 187 + <channel> 188 + <title>Feed</title> 189 + <link>https://example.com</link> 190 + <description>Feed</description> 191 + <item> 192 + <title>Item</title> 193 + <link>https://example.com/item</link> 194 + <guid>item-1</guid> 195 + </item> 196 + </channel> 197 + </rss>` 198 + 199 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 200 + attempts++ 201 + if attempts < 3 { 202 + w.Header().Set("Retry-After", "0") 203 + w.WriteHeader(http.StatusTooManyRequests) 204 + return 205 + } 206 + w.Header().Set("Content-Type", "application/rss+xml") 207 + w.WriteHeader(http.StatusOK) 208 + w.Write([]byte(feed)) 209 + })) 210 + defer server.Close() 211 + 212 + honorTrue := true 213 + zero := time.Duration(0) 214 + opts := config.FeedOptions{ 215 + MaxRetries: new(3), 216 + BaseBackoff: zero + 1, 217 + HonorRetryAfter: &honorTrue, 218 + } 219 + 220 + checker := NewChecker() 221 + items, err := checker.FetchLatestItems(context.Background(), server.URL, opts) 222 + if err != nil { 223 + t.Fatalf("Expected success after retries, got: %v", err) 224 + } 225 + if len(items) != 1 { 226 + t.Errorf("Expected 1 item, got %d", len(items)) 227 + } 228 + if attempts != 3 { 229 + t.Errorf("Expected 3 attempts, got %d", attempts) 230 + } 231 + } 232 + 233 + //go:fix inline 234 + func intPtr(i int) *int { return new(i) }
+3 -3
internal/storage/storage_test.go
··· 205 205 } 206 206 207 207 // Add items and check count 208 - for i := 0; i < 5; i++ { 208 + for i := range 5 { 209 209 store.MarkPosted(string(rune('a' + i))) 210 210 expectedCount := i + 1 211 211 if store.Count() != expectedCount { ··· 225 225 226 226 // Simulate concurrent access 227 227 done := make(chan bool) 228 - for i := 0; i < 10; i++ { 228 + for i := range 10 { 229 229 go func(id int) { 230 230 guid := string(rune('a' + id)) 231 231 store.MarkPosted(guid) ··· 236 236 } 237 237 238 238 // Wait for all goroutines 239 - for i := 0; i < 10; i++ { 239 + for range 10 { 240 240 <-done 241 241 } 242 242
+26 -38
main.go
··· 30 30 dryRun := flag.Bool("dry-run", false, "Don't actually post to Bluesky, just show what would be posted") 31 31 flag.Parse() 32 32 33 - // Load configuration from file 34 33 if *configFile == "" { 35 34 log.Fatal("Error: -config flag is required") 36 35 } ··· 52 51 } 53 52 log.Printf("Monitoring %d feed(s) across %d account(s)", totalFeeds, len(cfg.Accounts)) 54 53 55 - // Setup signal handling for graceful shutdown 56 54 ctx, cancel := context.WithCancel(context.Background()) 57 55 defer cancel() 58 56 ··· 68 66 // Initialize managers for each account 69 67 managers := make([]*AccountManager, 0, len(cfg.Accounts)) 70 68 for i, account := range cfg.Accounts { 71 - // Determine storage file for this account 72 69 storageFilePath := cfg.Storage 73 70 if account.Storage != "" { 74 71 storageFilePath = account.Storage ··· 79 76 storageFilePath = fmt.Sprintf("%s_%s%s", base, sanitizeHandle(account.Handle), ext) 80 77 } 81 78 82 - manager, err := NewAccountManager(ctx, account, storageFilePath, *dryRun) 79 + manager, err := NewAccountManager(ctx, cfg, account, storageFilePath, *dryRun) 83 80 if err != nil { 84 81 log.Fatalf("Failed to initialize account %d (%s): %v", i+1, account.Handle, err) 85 82 } ··· 99 96 } 100 97 } 101 98 102 - // Continue checking on interval 103 99 for { 104 100 select { 105 101 case <-ctx.Done(): ··· 114 110 } 115 111 } 116 112 117 - // AccountManager manages RSS checking and posting for a single Bluesky account 113 + // AccountManager manages RSS checking and posting for a single Bluesky account. 118 114 type AccountManager struct { 115 + cfg *config.Config 119 116 account config.Account 120 117 bskyClient *bluesky.Client 121 118 rssChecker *rss.Checker ··· 123 120 dryRun bool 124 121 } 125 122 126 - // NewAccountManager creates a new account manager 127 - func NewAccountManager(ctx context.Context, account config.Account, storageFile string, dryRun bool) (*AccountManager, error) { 123 + // NewAccountManager creates a new AccountManager. 124 + func NewAccountManager(ctx context.Context, cfg *config.Config, account config.Account, storageFile string, dryRun bool) (*AccountManager, error) { 128 125 store, err := storage.New(storageFile) 129 126 if err != nil { 130 127 return nil, fmt.Errorf("failed to initialize storage: %w", err) ··· 153 150 } 154 151 155 152 return &AccountManager{ 153 + cfg: cfg, 156 154 account: account, 157 155 bskyClient: bskyClient, 158 156 rssChecker: rssChecker, ··· 161 159 }, nil 162 160 } 163 161 164 - // CheckAndPost checks all feeds for this account and posts new items 162 + // CheckAndPost checks all feeds for this account and posts new items. 165 163 func (m *AccountManager) CheckAndPost(ctx context.Context) error { 166 - for _, feedURL := range m.account.Feeds { 167 - if err := m.checkAndPostFeed(ctx, feedURL); err != nil { 168 - log.Printf("[@%s] Error checking feed %s: %v", m.account.Handle, feedURL, err) 169 - // Continue with other feeds even if one fails 164 + for _, feed := range m.account.Feeds { 165 + if err := m.checkAndPostFeed(ctx, feed); err != nil { 166 + log.Printf("[@%s] Error checking feed %s: %v", m.account.Handle, feed.URL, err) 167 + // Continue with other feeds even if one fails. 170 168 } 171 169 } 172 170 return nil 173 171 } 174 172 175 - // checkAndPostFeed checks a single feed and posts new items 176 - func (m *AccountManager) checkAndPostFeed(ctx context.Context, feedURL string) error { 177 - log.Printf("[@%s] Checking RSS feed: %s", m.account.Handle, feedURL) 173 + // checkAndPostFeed checks a single feed and posts new items. 174 + func (m *AccountManager) checkAndPostFeed(ctx context.Context, feed config.FeedConfig) error { 175 + log.Printf("[@%s] Checking RSS feed: %s", m.account.Handle, feed.URL) 178 176 179 - items, err := m.rssChecker.FetchLatestItems(ctx, feedURL) 177 + opts := m.cfg.Resolved(feed) 178 + 179 + items, err := m.rssChecker.FetchLatestItems(ctx, feed.URL, opts) 180 180 if err != nil { 181 181 return fmt.Errorf("failed to fetch RSS items: %w", err) 182 182 } 183 183 184 184 log.Printf("[@%s] Found %d items in feed", m.account.Handle, len(items)) 185 185 186 - // Check if this is the first time seeing this feed (no items from it in storage) 186 + // Check if this is the first time seeing this feed (no items in storage). 187 187 hasSeenFeedBefore := false 188 188 for _, item := range items { 189 189 if m.store.IsPosted(item.GUID) { ··· 192 192 } 193 193 } 194 194 195 - // Process items in reverse order (oldest first) 195 + // Process items in reverse order (oldest first). 196 196 newItemCount := 0 197 197 postedCount := 0 198 198 199 199 for i := len(items) - 1; i >= 0; i-- { 200 200 item := items[i] 201 201 202 - // Skip if already posted 203 202 if m.store.IsPosted(item.GUID) { 204 203 continue 205 204 } 206 205 207 206 newItemCount++ 208 207 209 - // If this is first time seeing this feed, mark items as seen without posting 208 + // First time seeing this feed: mark items as seen without posting. 210 209 if !hasSeenFeedBefore { 211 210 if err := m.store.MarkPosted(item.GUID); err != nil { 212 211 log.Printf("[@%s] Failed to mark item as seen: %v", m.account.Handle, err) ··· 216 215 217 216 log.Printf("[@%s] New item found: %s", m.account.Handle, item.Title) 218 217 219 - // Create post text 220 218 postText := formatPost(item) 221 219 222 220 if m.dryRun { 223 221 log.Printf("[@%s] [DRY-RUN] Would post:\n%s\n", m.account.Handle, postText) 224 222 postedCount++ 225 223 } else { 226 - // Post to Bluesky 227 224 postCtx, cancel := context.WithTimeout(ctx, 30*time.Second) 228 225 err := m.bskyClient.Post(postCtx, postText) 229 226 cancel() ··· 237 234 postedCount++ 238 235 } 239 236 240 - // Mark as posted 241 237 if err := m.store.MarkPosted(item.GUID); err != nil { 242 238 log.Printf("[@%s] Failed to mark item as posted: %v", m.account.Handle, err) 243 239 } 244 240 245 - // Rate limiting ourselves to not get rate limited. 241 + // Small self-imposed delay between posts to avoid Bluesky rate limits. 246 242 if postedCount > 0 && !m.dryRun { 247 243 time.Sleep(2 * time.Second) 248 244 } ··· 250 246 251 247 if !hasSeenFeedBefore { 252 248 if newItemCount > 0 { 253 - log.Printf("[@%s] New feed detected: marked %d items as seen from %s (not posted)", m.account.Handle, newItemCount, feedURL) 249 + log.Printf("[@%s] New feed detected: marked %d items as seen from %s (not posted)", m.account.Handle, newItemCount, feed.URL) 254 250 } 255 251 } else { 256 252 if newItemCount == 0 { 257 - log.Printf("[@%s] No new items in feed %s", m.account.Handle, feedURL) 253 + log.Printf("[@%s] No new items in feed %s", m.account.Handle, feed.URL) 258 254 } else { 259 - log.Printf("[@%s] Processed %d new items from feed %s (%d posted)", m.account.Handle, newItemCount, feedURL, postedCount) 255 + log.Printf("[@%s] Processed %d new items from feed %s (%d posted)", m.account.Handle, newItemCount, feed.URL, postedCount) 260 256 } 261 257 } 262 258 ··· 264 260 } 265 261 266 262 func formatPost(item *rss.FeedItem) string { 267 - // Collect all unique URLs 268 263 urls := []string{} 269 264 if item.Link != "" { 270 265 urls = append(urls, item.Link) 271 266 } 272 267 273 - // Add GUID if it's a URL and different from link (e.g., HN comment links) 268 + // Add GUID if it's a URL and different from link (e.g. HN comment links). 274 269 if item.GUID != "" && item.GUID != item.Link { 275 270 if u, err := url.Parse(item.GUID); err == nil && (u.Scheme == "http" || u.Scheme == "https") { 276 271 urls = append(urls, item.GUID) 277 272 } 278 273 } 279 274 280 - // Build post: title + links 281 275 text := item.Title 282 276 if len(urls) > 0 { 283 277 text += "\n" + strings.Join(urls, "\n") 284 278 } 285 279 286 - // Truncate if too long 287 280 if len(text) > maxPostLength { 288 281 linkText := "" 289 282 if len(urls) > 0 { ··· 294 287 if availableForTitle > 20 { 295 288 text = truncateText(item.Title, availableForTitle) + "..." + linkText 296 289 } else { 297 - // Title too long even truncated, use just first URL or truncated title 298 290 if len(urls) > 0 { 299 291 text = urls[0] 300 292 } else { ··· 310 302 if len(text) <= maxLen { 311 303 return text 312 304 } 313 - 314 - // Try to truncate at word boundary 315 305 truncated := text[:maxLen] 316 306 lastSpace := strings.LastIndex(truncated, " ") 317 307 if lastSpace > maxLen/2 { 318 308 return text[:lastSpace] 319 309 } 320 - 321 310 return truncated 322 311 } 323 312 324 - // sanitizeHandle removes special characters from handle for use in filenames 313 + // sanitizeHandle replaces characters unsuitable for filenames. 325 314 func sanitizeHandle(handle string) string { 326 - // Replace dots and @ with underscores 327 315 sanitized := strings.ReplaceAll(handle, ".", "_") 328 316 sanitized = strings.ReplaceAll(sanitized, "@", "") 329 317 return sanitized