A batteries included HTTP/1.1 client in OCaml

Fix HTTP/2 connection window overflow by removing stale caching

The H2_adapter was caching HTTP/2 client state (including flow control
windows) independently from Conpool's TCP connection management. When
Conpool provided a new TCP connection, the cached H2_client would have
stale flow control windows from the previous connection. If the server
sent WINDOW_UPDATE frames, the accumulated window could exceed 2^31-1,
causing "Connection window overflow" errors.

The fix removes the caching layer from H2_adapter entirely. Each request
now creates a fresh H2_client and performs a new handshake. For proper
HTTP/2 multiplexing with connection reuse, the connection management
needs to be integrated at the Conpool level rather than in H2_adapter.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+35 -132
+22 -106
lib/h2/h2_adapter.ml
··· 6 6 (** HTTP/2 Adapter for Requests Library. 7 7 8 8 This module provides integration between the H2 implementation and 9 - the Requests library, handling: 10 - - Automatic response decompression 11 - - HTTP/2 connection state management for reuse *) 9 + the Requests library, handling automatic response decompression. 10 + 11 + NOTE: Connection state caching was removed because it was incompatible 12 + with how Conpool manages TCP connections - the cached H2_client state 13 + would become stale when Conpool provides a new TCP connection, leading 14 + to flow control window overflow errors. For proper HTTP/2 multiplexing 15 + with connection reuse, the connection management needs to be integrated 16 + at the Conpool level, not here. *) 12 17 13 18 let src = Logs.Src.create "requests.h2_adapter" ~doc:"HTTP/2 Adapter" 14 19 module Log = (val Logs.src_log src : Logs.LOG) ··· 43 48 Http_client.decompress_body ~limits ~content_encoding:encoding body_str 44 49 | None -> body_str 45 50 46 - (** {1 Connection State for Reuse} *) 47 - 48 - (** HTTP/2 connection state that can be cached for reuse. *) 49 - type connection_state = { 50 - client : H2_client.t; 51 - mutable last_used : float; 52 - } 53 - 54 - (** Check if a cached connection is stale (not used recently). *) 55 - let is_stale state ~max_idle_seconds = 56 - Unix.gettimeofday () -. state.last_used > max_idle_seconds 57 - 58 - (** Cache of HTTP/2 connection states, keyed by endpoint. *) 59 - let connection_cache : (string, connection_state) Hashtbl.t = Hashtbl.create 16 60 - let cache_mutex = Mutex.create () 61 - 62 - (** Exception-safe mutex wrapper. Ensures unlock even if [f] raises. *) 63 - let with_mutex f = 64 - Mutex.lock cache_mutex; 65 - Fun.protect ~finally:(fun () -> Mutex.unlock cache_mutex) f 66 - 67 - let cache_key ~host ~port = Printf.sprintf "%s:%d" host port 68 - 69 - let get_cached_connection ~host ~port = 70 - with_mutex @@ fun () -> 71 - Hashtbl.find_opt connection_cache (cache_key ~host ~port) 72 - 73 - let cache_connection ~host ~port client = 74 - let key = cache_key ~host ~port in 75 - with_mutex (fun () -> 76 - Hashtbl.replace connection_cache key { client; last_used = Unix.gettimeofday () }); 77 - Log.debug (fun m -> m "Cached HTTP/2 connection for %s" key) 78 - 79 - let remove_cached_connection ~host ~port = 80 - let key = cache_key ~host ~port in 81 - with_mutex (fun () -> Hashtbl.remove connection_cache key); 82 - Log.debug (fun m -> m "Removed cached HTTP/2 connection for %s" key) 83 - 84 51 (** {1 Response Construction} *) 85 52 86 53 (** Convert H2 protocol response to adapter response with decompression. *) ··· 89 56 let body = decompress_body ~auto_decompress ~headers h2_resp.H2_protocol.body in 90 57 { status = h2_resp.H2_protocol.status; headers; body } 91 58 92 - (** Maximum idle time for cached connections (5 minutes). *) 93 - let max_idle_seconds = 300.0 94 - 95 - (** Determine whether to reuse a cached connection or create a new one. *) 96 - let get_or_create_client ~host ~port = 97 - match get_cached_connection ~host ~port with 98 - | Some state when H2_client.is_open state.client && not (is_stale state ~max_idle_seconds) -> 99 - Log.debug (fun m -> m "Reusing cached HTTP/2 connection for %s:%d" host port); 100 - state.last_used <- Unix.gettimeofday (); 101 - (state.client, false) 102 - | Some _ -> 103 - remove_cached_connection ~host ~port; 104 - Log.debug (fun m -> m "Creating new HTTP/2 connection for %s:%d (old one unusable)" host port); 105 - (H2_client.create (), true) 106 - | None -> 107 - Log.debug (fun m -> m "Creating new HTTP/2 connection for %s:%d" host port); 108 - (H2_client.create (), true) 59 + (** {1 Request Functions} *) 109 60 110 - (** {1 Request Functions} *) 61 + (** Make an HTTP/2 request. 111 62 112 - (** Make an HTTP/2 request, reusing connection if available. 63 + This function creates a fresh H2_client for each request and performs 64 + a new handshake. For proper HTTP/2 multiplexing with connection reuse, 65 + the connection management needs to be integrated at the Conpool level. 113 66 114 67 @param flow The underlying TLS connection 115 68 @param uri Request URI ··· 133 86 Error (Format.asprintf "Invalid HTTP/2 request headers: %a" 134 87 Headers.pp_h2_validation_error e) 135 88 | Ok () -> 136 - let host = Uri.host uri |> Option.value ~default:"" in 137 - let port = Uri.port uri |> Option.value ~default:443 in 138 - let h2_headers = Headers.to_list headers in 139 - let h2_body = Option.bind body body_to_string_opt in 140 - let client, needs_handshake = get_or_create_client ~host ~port in 141 - let handshake_result = 142 - if needs_handshake then 143 - H2_client.handshake flow client |> Result.map (fun () -> 144 - cache_connection ~host ~port client) 145 - else 146 - Ok () 147 - in 148 - Result.bind handshake_result @@ fun () -> 149 - let meth = Method.to_string method_ in 150 - let req = H2_protocol.make_request ~meth ~uri ~headers:h2_headers ?body:h2_body () in 151 - H2_client.request flow client req 152 - |> Result.map_error (fun msg -> 153 - remove_cached_connection ~host ~port; 154 - msg) 155 - |> Result.map (make_response ~auto_decompress) 156 - 157 - (** Make a one-shot HTTP/2 request (no connection reuse). *) 158 - let one_request 159 - ~(flow : [> Eio.Flow.two_way_ty] Eio.Resource.t) 160 - ~(uri : Uri.t) 161 - ~(headers : Headers.t) 162 - ?(body : Body.t option) 163 - ~(method_ : Method.t) 164 - ~auto_decompress 165 - () 166 - : (response, string) result = 167 - (* Validate HTTP/2 header constraints per RFC 9113 Section 8.2.2 and 8.3 *) 168 - match Headers.validate_h2_user_headers headers with 169 - | Error e -> 170 - Error (Format.asprintf "Invalid HTTP/2 request headers: %a" 171 - Headers.pp_h2_validation_error e) 172 - | Ok () -> 173 89 let h2_headers = Headers.to_list headers in 174 90 let h2_body = Option.bind body body_to_string_opt in 175 91 let meth = Method.to_string method_ in 176 92 H2_client.one_request flow ~meth ~uri ~headers:h2_headers ?body:h2_body () 177 93 |> Result.map (make_response ~auto_decompress) 178 94 95 + (** Make a one-shot HTTP/2 request (same as {!request}). *) 96 + let one_request = request 97 + 179 98 (** {1 Connection Management} *) 180 99 181 - (** Clear all cached HTTP/2 connections. *) 100 + (** Clear all cached HTTP/2 connections. 101 + NOTE: This is now a no-op since caching was removed. *) 182 102 let clear_connection_cache () = 183 - with_mutex (fun () -> Hashtbl.clear connection_cache); 184 - Log.info (fun m -> m "Cleared HTTP/2 connection cache") 103 + Log.debug (fun m -> m "clear_connection_cache called (caching disabled)") 185 104 186 - (** Get statistics about cached connections. *) 187 - let connection_cache_stats () = 188 - with_mutex @@ fun () -> 189 - let count = Hashtbl.length connection_cache in 190 - let endpoints = Hashtbl.fold (fun k _ acc -> k :: acc) connection_cache [] in 191 - (count, endpoints) 105 + (** Get statistics about cached connections. 106 + NOTE: Always returns (0, []) since caching was removed. *) 107 + let connection_cache_stats () = (0, [])
+13 -26
lib/h2/h2_adapter.mli
··· 6 6 (** HTTP/2 Adapter for Requests Library. 7 7 8 8 This module provides integration between the H2 implementation and 9 - the Requests library, handling: 10 - - Automatic response decompression 11 - - HTTP/2 connection state management for reuse 9 + the Requests library, handling automatic response decompression. 10 + 11 + NOTE: Connection state caching was removed because it was incompatible 12 + with how Conpool manages TCP connections. For proper HTTP/2 multiplexing 13 + with connection reuse, the connection management needs to be integrated 14 + at the Conpool level. 12 15 13 16 {2 Usage} 14 17 15 - For session-based requests with connection reuse: 16 18 {[ 17 19 match H2_adapter.request ~flow ~uri ~headers ~method_:`GET 18 20 ~auto_decompress:true () with 19 21 | Ok resp -> Printf.printf "Status: %d\n" resp.status 20 - | Error msg -> Printf.printf "Error: %s\n" msg 21 - ]} 22 - 23 - For one-shot requests: 24 - {[ 25 - match H2_adapter.one_request ~flow ~uri ~headers ~method_:`GET 26 - ~auto_decompress:true () with 27 - | Ok resp -> Printf.printf "Got %d bytes\n" (String.length resp.body) 28 22 | Error msg -> Printf.printf "Error: %s\n" msg 29 23 ]} *) 30 24 ··· 55 49 unit -> 56 50 (response, string) result 57 51 (** [request ~flow ~uri ~headers ?body ~method_ ~auto_decompress ()] 58 - makes an HTTP/2 request, reusing cached connection state if available. 52 + makes an HTTP/2 request. 59 53 60 - The connection handshake is cached per host:port endpoint, allowing 61 - multiple requests to reuse the same HTTP/2 connection without 62 - repeating the handshake. 54 + This function creates a fresh H2_client for each request and performs 55 + a new handshake. For proper HTTP/2 multiplexing with connection reuse, 56 + the connection management needs to be integrated at the Conpool level. 63 57 64 58 @param flow The underlying TLS connection 65 59 @param uri Request URI ··· 78 72 auto_decompress:bool -> 79 73 unit -> 80 74 (response, string) result 81 - (** [one_request ~flow ~uri ~headers ?body ~method_ ~auto_decompress ()] 82 - makes a single HTTP/2 request without connection reuse. 83 - 84 - Use this for one-off requests. For multiple requests to the same 85 - endpoint, use {!request} which caches connection state. *) 75 + (** [one_request] is an alias for {!request}. *) 86 76 87 77 (** {1 Connection Management} *) 88 78 89 79 val clear_connection_cache : unit -> unit 90 - (** [clear_connection_cache ()] removes all cached HTTP/2 connection states. 91 - Call this when connections need to be reset (e.g., after network changes). *) 80 + (** [clear_connection_cache ()] is a no-op (caching was removed). *) 92 81 93 82 val connection_cache_stats : unit -> int * string list 94 - (** [connection_cache_stats ()] returns [(count, endpoints)] where [count] 95 - is the number of cached connections and [endpoints] is a list of 96 - "host:port" strings for each cached connection. *) 83 + (** [connection_cache_stats ()] always returns [(0, [])] (caching was removed). *)