Webhook-to-SSE gateway with hierarchical topic routing and signature verification

Smart payload encoding, HTTP method on events, and verification logging

Text payloads (text/*, form-urlencoded, XML) are now stored as plain
strings instead of being base64-encoded, which makes them readable on
the subscriber side. Binary payloads still get base64. Events also
carry the HTTP method now, and verification failures log the request
headers to help debug signature mismatches.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+215 -4
+1
event.go
··· 5 5 type Event struct { 6 6 ID string `json:"id"` 7 7 Timestamp time.Time `json:"timestamp"` 8 + Method string `json:"method"` 8 9 Path string `json:"path"` 9 10 Headers map[string]string `json:"headers"` 10 11 Payload any `json:"payload"`
+189
payload_test.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/base64" 6 + "encoding/json" 7 + "net/http" 8 + "net/http/httptest" 9 + "strings" 10 + "testing" 11 + "time" 12 + ) 13 + 14 + func postAndReceive(t *testing.T, ts *httptest.Server, broker *Broker, contentType string, body string) *Event { 15 + t.Helper() 16 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 17 + defer cancel() 18 + 19 + ch, unsub := broker.Subscribe("test", "") 20 + defer unsub() 21 + 22 + req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader(body)) 23 + req.Header.Set("Content-Type", contentType) 24 + resp, err := http.DefaultClient.Do(req) 25 + if err != nil { 26 + t.Fatalf("POST failed: %v", err) 27 + } 28 + defer resp.Body.Close() 29 + if resp.StatusCode != http.StatusAccepted { 30 + t.Fatalf("expected 202, got %d", resp.StatusCode) 31 + } 32 + 33 + select { 34 + case event := <-ch: 35 + return event 36 + case <-ctx.Done(): 37 + t.Fatal("timed out waiting for event") 38 + return nil 39 + } 40 + } 41 + 42 + func TestServer_postFormDataStoredAsText(t *testing.T) { 43 + ts, broker := newTestServer(nil) 44 + defer ts.Close() 45 + 46 + event := postAndReceive(t, ts, broker, "application/x-www-form-urlencoded", "foo=bar&baz=qux") 47 + 48 + s, ok := event.Payload.(string) 49 + if !ok { 50 + t.Fatalf("expected string payload, got %T", event.Payload) 51 + } 52 + if s != "foo=bar&baz=qux" { 53 + t.Errorf("expected raw form data, got %s", s) 54 + } 55 + } 56 + 57 + func TestServer_postPlainTextStoredAsText(t *testing.T) { 58 + ts, broker := newTestServer(nil) 59 + defer ts.Close() 60 + 61 + event := postAndReceive(t, ts, broker, "text/plain", "hello world") 62 + 63 + s, ok := event.Payload.(string) 64 + if !ok { 65 + t.Fatalf("expected string payload, got %T", event.Payload) 66 + } 67 + if s != "hello world" { 68 + t.Errorf("expected raw text, got %s", s) 69 + } 70 + } 71 + 72 + func TestServer_postBinaryBase64Encoded(t *testing.T) { 73 + ts, broker := newTestServer(nil) 74 + defer ts.Close() 75 + 76 + event := postAndReceive(t, ts, broker, "application/octet-stream", "\x00\x01\x02\x03") 77 + 78 + s, ok := event.Payload.(string) 79 + if !ok { 80 + t.Fatalf("expected string payload, got %T", event.Payload) 81 + } 82 + expected := base64.StdEncoding.EncodeToString([]byte("\x00\x01\x02\x03")) 83 + if s != expected { 84 + t.Errorf("expected base64 %s, got %s", expected, s) 85 + } 86 + } 87 + 88 + func TestServer_postIncludesMethod(t *testing.T) { 89 + ts, broker := newTestServer(nil) 90 + defer ts.Close() 91 + 92 + event := postAndReceive(t, ts, broker, "application/json", `{"ok":true}`) 93 + 94 + if event.Method != "POST" { 95 + t.Errorf("expected method POST, got %s", event.Method) 96 + } 97 + } 98 + 99 + func TestServer_postMalformedContentTypeFallsBackToBase64(t *testing.T) { 100 + ts, broker := newTestServer(nil) 101 + defer ts.Close() 102 + 103 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 104 + defer cancel() 105 + 106 + ch, unsub := broker.Subscribe("test", "") 107 + defer unsub() 108 + 109 + req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader("some data")) 110 + req.Header.Set("Content-Type", ";;;malformed") 111 + resp, err := http.DefaultClient.Do(req) 112 + if err != nil { 113 + t.Fatalf("POST failed: %v", err) 114 + } 115 + defer resp.Body.Close() 116 + if resp.StatusCode != http.StatusAccepted { 117 + t.Fatalf("expected 202, got %d", resp.StatusCode) 118 + } 119 + 120 + select { 121 + case event := <-ch: 122 + s, ok := event.Payload.(string) 123 + if !ok { 124 + t.Fatalf("expected string payload, got %T", event.Payload) 125 + } 126 + expected := base64.StdEncoding.EncodeToString([]byte("some data")) 127 + if s != expected { 128 + t.Errorf("expected base64 %s, got %s", expected, s) 129 + } 130 + case <-ctx.Done(): 131 + t.Fatal("timed out waiting for event") 132 + } 133 + } 134 + 135 + func TestIsTextContent(t *testing.T) { 136 + tests := []struct { 137 + name string 138 + mediaType string 139 + params map[string]string 140 + want bool 141 + }{ 142 + {"text/plain", "text/plain", nil, true}, 143 + {"text/html", "text/html", nil, true}, 144 + {"text/xml", "text/xml", nil, true}, 145 + {"form urlencoded", "application/x-www-form-urlencoded", nil, true}, 146 + {"application/xml", "application/xml", nil, true}, 147 + {"application/xhtml+xml", "application/xhtml+xml", nil, true}, 148 + {"charset param on non-text", "application/octet-stream", map[string]string{"charset": "utf-8"}, true}, 149 + {"application/json", "application/json", nil, false}, 150 + {"application/octet-stream", "application/octet-stream", nil, false}, 151 + {"image/png", "image/png", nil, false}, 152 + } 153 + for _, tt := range tests { 154 + t.Run(tt.name, func(t *testing.T) { 155 + got := isTextContent(tt.mediaType, tt.params) 156 + if got != tt.want { 157 + t.Errorf("isTextContent(%q, %v) = %v, want %v", tt.mediaType, tt.params, got, tt.want) 158 + } 159 + }) 160 + } 161 + } 162 + 163 + func TestServer_postJSONViaSSERoundTrip(t *testing.T) { 164 + ts, _ := newTestServer(nil) 165 + defer ts.Close() 166 + 167 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 168 + defer cancel() 169 + 170 + events := sseSubscribe(ctx, ts.URL+"/test", nil) 171 + time.Sleep(50 * time.Millisecond) 172 + 173 + http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{"key":"value"}`)) 174 + 175 + select { 176 + case event := <-events: 177 + if event.Method != "POST" { 178 + t.Errorf("expected method POST in SSE event, got %s", event.Method) 179 + } 180 + data, _ := json.Marshal(event.Payload) 181 + var m map[string]string 182 + json.Unmarshal(data, &m) 183 + if m["key"] != "value" { 184 + t.Errorf("expected key=value, got %v", m) 185 + } 186 + case <-ctx.Done(): 187 + t.Fatal("timed out waiting for SSE event") 188 + } 189 + }
+22 -1
server.go
··· 6 6 "fmt" 7 7 "io" 8 8 "log" 9 + "mime" 9 10 "net/http" 10 11 "strings" 11 12 "sync/atomic" ··· 71 72 return 72 73 } 73 74 if err := verifier.Verify(body, r.Header, pc.Secret, pc.SignatureHeader); err != nil { 75 + log.Printf("verification failed for %s: %v, headers: %v", path, err, r.Header) 74 76 http.Error(w, "forbidden", http.StatusForbidden) 75 77 return 76 78 } 77 79 } 78 80 79 81 var payload any 80 - if strings.HasPrefix(r.Header.Get("Content-Type"), "application/json") { 82 + mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) 83 + if err == nil && mediaType == "application/json" { 81 84 if err := json.Unmarshal(body, &payload); err != nil { 82 85 http.Error(w, "invalid JSON", http.StatusBadRequest) 83 86 return 84 87 } 88 + } else if err == nil && isTextContent(mediaType, params) { 89 + payload = string(body) 85 90 } else { 86 91 payload = base64.StdEncoding.EncodeToString(body) 87 92 } ··· 91 96 event := &Event{ 92 97 ID: uuid.New().String(), 93 98 Timestamp: time.Now().UTC(), 99 + Method: r.Method, 94 100 Path: path, 95 101 Headers: headers, 96 102 Payload: payload, ··· 176 182 } 177 183 return headers 178 184 } 185 + 186 + func isTextContent(mediaType string, params map[string]string) bool { 187 + if strings.HasPrefix(mediaType, "text/") { 188 + return true 189 + } 190 + if mediaType == "application/x-www-form-urlencoded" || 191 + mediaType == "application/xml" || 192 + mediaType == "application/xhtml+xml" { 193 + return true 194 + } 195 + if _, ok := params["charset"]; ok { 196 + return true 197 + } 198 + return false 199 + }
+3 -3
sse_test.go
··· 322 322 } 323 323 } 324 324 325 - func TestServer_nonJSONPayloadBase64(t *testing.T) { 325 + func TestServer_textPayloadStoredAsString(t *testing.T) { 326 326 ts, _ := newTestServer(nil) 327 327 defer ts.Close() 328 328 ··· 362 362 363 363 select { 364 364 case payload := <-done: 365 - if payload != "aGVsbG8gd29ybGQ=" { 366 - t.Errorf("expected base64 encoded payload, got %s", payload) 365 + if payload != "hello world" { 366 + t.Errorf("expected plain text payload, got %s", payload) 367 367 } 368 368 case <-time.After(2 * time.Second): 369 369 t.Fatal("timed out waiting for event")