A batteries included HTTP/1.1 client in OCaml

Add connection-lifetime switches and HTTP/2 true multiplexing

Conpool changes:
- Add connection-lifetime switch passed to protocol init_state, enabling
long-running fibers (e.g., HTTP/2 background reader)
- Add on_acquire/on_release protocol hooks for lazy fiber initialization
- Enforce max_idle_time via pc_last_used tracking
- Enforce max_connection_uses via pc_use_count tracking
- Track idle count (connections with no active users)
- Track error count (protocol failures vs normal lifecycle closes)
- Distinguish Unhealthy_error from Unhealthy_lifecycle in health checks

HTTP/2 changes:
- Enable true multiplexing: access_mode now returns Shared
- Start background reader fiber on first acquire (lazy init)
- Add on_goaway callback to start_reader for GOAWAY notifications
- Use concurrent request path instead of synchronous

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+81 -27
+1 -1
lib/h2/dune
··· 2 2 (name requests_h2) 3 3 (public_name requests.h2) 4 4 (wrapped false) 5 - (libraries requests_core requests_h2_internal requests_h1 eio cstruct optint bigstringaf tls ca-certs)) 5 + (libraries requests_core requests_h2_internal requests_h1 eio cstruct optint bigstringaf tls ca-certs conpool logs))
+13 -6
lib/h2/h2_client.ml
··· 292 292 Eio.Stream.add handler.events (Connection_error ("GOAWAY: " ^ debug)) 293 293 ) t.handlers 294 294 ); 295 - `Goaway debug 295 + `Goaway (last_stream_id, error_code, debug) 296 296 297 297 | _ -> 298 298 Log.debug (fun m -> m "Ignoring connection-level frame: %a" ··· 363 363 ============================================================ *) 364 364 365 365 (** Start the background frame reader. 366 - This runs in a fiber and dispatches frames to stream handlers. *) 367 - let start_reader ~sw flow t = 366 + This runs in a fiber and dispatches frames to stream handlers. 367 + 368 + @param sw Switch to spawn the reader fiber on (should be connection-lifetime) 369 + @param flow The underlying connection flow 370 + @param t Client state 371 + @param on_goaway Optional callback invoked when GOAWAY is received *) 372 + let start_reader ~sw flow t ~on_goaway = 368 373 if t.reader_running then 369 374 () (* Already running *) 370 375 else begin ··· 389 394 | Some frame -> 390 395 match dispatch_frame t flow frame with 391 396 | `Continue -> read_loop () 392 - | `Goaway _ -> 397 + | `Goaway (last_stream_id, error_code, debug) -> 398 + (* Call the GOAWAY callback if provided *) 399 + on_goaway ~last_stream_id ~error_code ~debug; 393 400 (* Continue reading to drain any remaining frames *) 394 401 read_loop () 395 402 | `Error msg -> ··· 434 441 Log.info (fun m -> m "Sending HTTP/2 request: %s %s" 435 442 req.meth (Uri.to_string req.uri)); 436 443 437 - (* Ensure reader is running *) 438 - start_reader ~sw flow t; 444 + (* Ensure reader is running - use no-op callback for ad-hoc usage *) 445 + start_reader ~sw flow t ~on_goaway:(fun ~last_stream_id:_ ~error_code:_ ~debug:_ -> ()); 439 446 440 447 (* Create a new stream *) 441 448 match H2_connection.create_stream t.conn with
+19
lib/h2/h2_client.mli
··· 116 116 (** [close flow t] gracefully closes the connection. 117 117 Sends GOAWAY and marks connection as closed. *) 118 118 119 + (** {1 Background Reader} *) 120 + 121 + val start_reader : 122 + sw:Eio.Switch.t -> 123 + [> Eio.Flow.two_way_ty] Eio.Resource.t -> 124 + t -> 125 + on_goaway:(last_stream_id:int32 -> error_code:H2_frame.error_code -> debug:string -> unit) -> 126 + unit 127 + (** [start_reader ~sw flow t ~on_goaway] starts the background frame reader fiber. 128 + 129 + The reader fiber dispatches incoming frames to stream handlers and must be 130 + running for concurrent requests to work. The [sw] parameter should be a 131 + connection-lifetime switch. 132 + 133 + @param sw Switch for the reader fiber (connection-lifetime) 134 + @param flow The underlying connection 135 + @param t Client state 136 + @param on_goaway Callback invoked when GOAWAY is received from peer *) 137 + 119 138 (** {1 Making Requests} *) 120 139 121 140 val request : sw:Eio.Switch.t ->
+40 -12
lib/h2/h2_conpool_handler.ml
··· 26 26 (** The underlying HTTP/2 client. *) 27 27 flow : Conpool.Config.connection_flow; 28 28 (** The connection flow (needed for H2 operations). *) 29 + sw : Eio.Switch.t; 30 + (** Connection-lifetime switch for the reader fiber. *) 31 + mutable reader_started : bool; 32 + (** Whether the background reader fiber has been started. *) 29 33 mutable goaway_received : bool; 30 34 (** Whether GOAWAY has been received from peer. *) 35 + mutable last_goaway_stream : int32; 36 + (** Last stream ID from GOAWAY (streams > this may be retried). *) 31 37 mutable max_concurrent_streams : int; 32 38 (** Cached max_concurrent_streams from peer settings. *) 33 39 } ··· 35 41 (** {1 Protocol Configuration} *) 36 42 37 43 (** Initialize HTTP/2 state for a new connection. 38 - Performs the HTTP/2 handshake and extracts peer settings. *) 39 - let init_state ~flow ~tls_epoch:_ = 44 + Performs the HTTP/2 handshake and extracts peer settings. 45 + The [sw] parameter is a connection-lifetime switch that will be used 46 + to spawn the background reader fiber when on_acquire is called. *) 47 + let init_state ~sw ~flow ~tls_epoch:_ = 40 48 Log.info (fun m -> m "Initializing HTTP/2 connection state"); 41 49 42 50 let client = H2_client.create () in ··· 60 68 { 61 69 client; 62 70 flow; 71 + sw; 72 + reader_started = false; 63 73 goaway_received = false; 74 + last_goaway_stream = Int32.max_int; 64 75 max_concurrent_streams = max_streams; 65 76 } 66 77 ··· 68 79 Log.err (fun m -> m "HTTP/2 handshake failed: %s" msg); 69 80 failwith ("HTTP/2 handshake failed: " ^ msg) 70 81 82 + (** Called when a connection is acquired from the pool. 83 + Starts the background reader fiber on first acquisition. *) 84 + let on_acquire state = 85 + if not state.reader_started then begin 86 + Log.info (fun m -> m "Starting HTTP/2 background reader fiber"); 87 + H2_client.start_reader ~sw:state.sw state.flow state.client 88 + ~on_goaway:(fun ~last_stream_id ~error_code:_ ~debug -> 89 + Log.info (fun m -> m "GOAWAY received: last_stream_id=%ld, debug=%s" 90 + last_stream_id debug); 91 + state.goaway_received <- true; 92 + state.last_goaway_stream <- last_stream_id); 93 + state.reader_started <- true 94 + end 95 + 96 + (** Called when a connection is released back to the pool. 97 + For HTTP/2, this is a no-op since the reader keeps running. *) 98 + let on_release _state = () 99 + 71 100 (** Check if the HTTP/2 connection is still healthy. *) 72 101 let is_healthy state = 73 102 if state.goaway_received then begin ··· 88 117 end 89 118 90 119 (** Get access mode for this connection. 91 - Currently using Exclusive mode since the synchronous request path 92 - doesn't support true multiplexing. The max_concurrent_streams is 93 - still tracked for future multiplexing support. *) 94 - let access_mode _state = 95 - Conpool.Config.Exclusive 120 + HTTP/2 supports multiplexing: multiple concurrent streams per connection. *) 121 + let access_mode state = 122 + Conpool.Config.Shared state.max_concurrent_streams 96 123 97 124 (** The protocol configuration for HTTP/2 connections. *) 98 125 let h2_protocol : h2_state Conpool.Config.protocol_config = { 99 126 Conpool.Config.init_state; 127 + on_acquire; 128 + on_release; 100 129 is_healthy; 101 130 on_close; 102 131 access_mode; ··· 106 135 107 136 (** Make an HTTP/2 request using the pooled connection state. 108 137 109 - Uses synchronous I/O to avoid background fiber lifecycle issues 110 - with connection pooling. True multiplexing would require the 111 - connection pool to manage the reader fiber lifetime. 138 + Uses the concurrent request path with the connection-lifetime reader fiber. 139 + Multiple requests can be made concurrently on the same connection. 112 140 113 141 @param state The HTTP/2 state from Conpool 114 142 @param uri Request URI ··· 146 174 Log.debug (fun m -> m "Making HTTP/2 request on pooled connection: %s %s" 147 175 meth (Uri.to_string uri)); 148 176 149 - (* Use synchronous request to avoid fiber lifecycle issues *) 150 - match H2_client.request_sync state.flow state.client 177 + (* Use concurrent request path - reader fiber is already running *) 178 + match H2_client.request ~sw:state.sw state.flow state.client 151 179 { H2_protocol.meth; uri; headers = h2_headers; body = h2_body } with 152 180 | Ok resp -> 153 181 (* Check if GOAWAY was received during request *)
+2 -2
lib/requests.mli
··· 237 237 238 238 val create : 239 239 sw:Eio.Switch.t -> 240 - ?http_pool:Conpool.t -> 241 - ?https_pool:Conpool.t -> 240 + ?http_pool:unit Conpool.t -> 241 + ?https_pool:unit Conpool.t -> 242 242 ?cookie_jar:Cookeio_jar.t -> 243 243 ?default_headers:Headers.t -> 244 244 ?auth:Auth.t ->
+3 -3
test/test_localhost.ml
··· 142 142 (* Run all requests concurrently with fiber limit *) 143 143 Eio.Fiber.List.iter ~max_fibers:200 (fun (endpoint, req_id) -> 144 144 try 145 - Conpool.with_connection pool endpoint (fun flow -> 145 + Conpool.with_connection pool endpoint (fun conn -> 146 146 let test_msg = Printf.sprintf "Request %d" req_id in 147 - Eio.Flow.copy_string (test_msg ^ "\n") flow; 147 + Eio.Flow.copy_string (test_msg ^ "\n") conn.Conpool.flow; 148 148 149 - let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in 149 + let buf = Eio.Buf_read.of_flow conn.Conpool.flow ~max_size:1024 in 150 150 let _response = Eio.Buf_read.line buf in 151 151 let count = Atomic.fetch_and_add success_count 1 + 1 in 152 152
+3 -3
test/test_simple.ml
··· 48 48 traceln "Testing connection"; 49 49 let endpoint = Conpool.Endpoint.make ~host:"127.0.0.1" ~port:9000 in 50 50 51 - let response = Conpool.with_connection pool endpoint (fun flow -> 51 + let response = Conpool.with_connection pool endpoint (fun conn -> 52 52 traceln "Client: sending message"; 53 - Eio.Flow.copy_string "test message\n" flow; 54 - let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in 53 + Eio.Flow.copy_string "test message\n" conn.Conpool.flow; 54 + let buf = Eio.Buf_read.of_flow conn.Conpool.flow ~max_size:1024 in 55 55 let resp = Eio.Buf_read.line buf in 56 56 traceln "Client: received: %s" resp; 57 57 resp