TCP/TLS connection pooling for Eio

tls-epoch

+114 -4
+2
lib/connection.ml
··· 13 13 14 14 type t = { 15 15 flow : [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t; 16 + tls_flow : Tls_eio.t option; 16 17 created_at : float; 17 18 mutable last_used : float; 18 19 mutable use_count : int; ··· 21 22 } 22 23 23 24 let flow t = t.flow 25 + let tls_flow t = t.tls_flow 24 26 let endpoint t = t.endpoint 25 27 let created_at t = t.created_at 26 28 let last_used t = t.last_used
+75 -4
lib/conpool.ml
··· 57 57 type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] 58 58 type connection = connection_ty Eio.Resource.t 59 59 60 + type connection_with_info = { 61 + flow : connection; 62 + tls_epoch : Tls.Core.epoch_data option; 63 + } 64 + 60 65 type endp_stats = { 61 66 mutable active : int; 62 67 mutable idle : int; ··· 161 166 Log.debug (fun m -> 162 167 m "TCP connection established to %a" Endpoint.pp endpoint); 163 168 164 - let flow = 169 + let flow, tls_flow = 165 170 match pool.tls with 166 171 | None -> 167 - (socket :> connection) 172 + ((socket :> connection), None) 168 173 | Some tls_config -> 169 174 try 170 175 Log.debug (fun m -> ··· 172 177 let host = 173 178 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 174 179 in 175 - let tls_flow = Tls_eio.client_of_flow ~host tls_config socket in 180 + let tls = Tls_eio.client_of_flow ~host tls_config socket in 176 181 Log.info (fun m -> 177 182 m "TLS connection established to %a" Endpoint.pp endpoint); 178 - (tls_flow :> connection) 183 + ((tls :> connection), Some tls) 179 184 with Eio.Io _ as ex -> 180 185 let bt = Printexc.get_raw_backtrace () in 181 186 Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint ··· 185 190 Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint); 186 191 { 187 192 Connection.flow; 193 + tls_flow; 188 194 created_at = now; 189 195 last_used = now; 190 196 use_count = 0; ··· 507 513 Eio.Promise.await conn_promise 508 514 509 515 let connection ~sw t endpoint = connection_internal ~sw t endpoint 516 + 517 + let connection_with_info_internal ~sw (T pool) endpoint = 518 + Log.debug (fun m -> m "Acquiring connection with TLS info to %a" Endpoint.pp endpoint); 519 + let ep_pool = get_or_create_endpoint_pool pool endpoint in 520 + 521 + (* Create promises for connection handoff and cleanup signal *) 522 + let conn_promise, conn_resolver = Eio.Promise.create () in 523 + let done_promise, done_resolver = Eio.Promise.create () in 524 + 525 + (* Increment active count *) 526 + Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 527 + ep_pool.stats.active <- ep_pool.stats.active + 1); 528 + 529 + (* Fork a daemon fiber to manage the connection lifecycle *) 530 + Eio.Fiber.fork_daemon ~sw:pool.sw (fun () -> 531 + Fun.protect 532 + ~finally:(fun () -> 533 + Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 534 + ep_pool.stats.active <- ep_pool.stats.active - 1); 535 + Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)) 536 + (fun () -> 537 + Eio.Pool.use ep_pool.pool (fun conn -> 538 + Log.debug (fun m -> 539 + m "Using connection to %a (uses=%d)" Endpoint.pp endpoint 540 + (Connection.use_count conn)); 541 + 542 + Connection.update_usage conn ~now:(get_time pool); 543 + 544 + Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 545 + ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 546 + 547 + (* Get TLS epoch if available *) 548 + let tls_epoch = 549 + match Connection.tls_flow conn with 550 + | Some tls_flow -> ( 551 + match Tls_eio.epoch tls_flow with 552 + | Ok epoch -> Some epoch 553 + | Error () -> None) 554 + | None -> None 555 + in 556 + 557 + (* Hand off connection with TLS info to caller *) 558 + Eio.Promise.resolve conn_resolver { flow = conn.flow; tls_epoch }; 559 + 560 + try 561 + Eio.Promise.await done_promise; 562 + 563 + Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 564 + ep_pool.stats.idle <- ep_pool.stats.idle + 1); 565 + 566 + `Stop_daemon 567 + with e -> 568 + close_internal pool conn; 569 + 570 + Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 571 + ep_pool.stats.errors <- ep_pool.stats.errors + 1); 572 + 573 + raise e))); 574 + 575 + Eio.Switch.on_release sw (fun () -> 576 + Eio.Promise.resolve done_resolver ()); 577 + 578 + Eio.Promise.await conn_promise 579 + 580 + let connection_with_info ~sw t endpoint = connection_with_info_internal ~sw t endpoint 510 581 511 582 let with_connection t endpoint f = 512 583 Eio.Switch.run (fun sw -> f (connection ~sw t endpoint))
+37
lib/conpool.mli
··· 98 98 type connection = connection_ty Eio.Resource.t 99 99 (** A connection resource from the pool. *) 100 100 101 + type connection_with_info = { 102 + flow : connection; 103 + tls_epoch : Tls.Core.epoch_data option; 104 + } 105 + (** A connection with additional TLS information. 106 + 107 + The [tls_epoch] field contains the TLS session data if this connection 108 + uses TLS, or [None] for plaintext connections. This is needed for 109 + protocols like ACE-MQTT (RFC 9431) that require access to TLS exporter 110 + material for proof-of-possession. *) 111 + 101 112 (** {1 Connection Pool} *) 102 113 103 114 type t ··· 146 157 Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn; 147 158 let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in 148 159 Eio.Buf_read.take_all buf) 160 + ]} *) 161 + 162 + val connection_with_info : sw:Eio.Switch.t -> t -> Endpoint.t -> connection_with_info 163 + (** [connection_with_info ~sw pool endpoint] acquires a connection with TLS info. 164 + 165 + Like {!connection}, but returns a record containing both the connection flow 166 + and TLS epoch data (if the connection uses TLS). 167 + 168 + This is useful for protocols that need access to TLS session information, 169 + such as ACE-MQTT (RFC 9431) which uses TLS exporter material for 170 + proof-of-possession authentication. 171 + 172 + Example: 173 + {[ 174 + let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in 175 + Eio.Switch.run (fun sw -> 176 + let info = Conpool.connection_with_info ~sw pool endpoint in 177 + (* Access TLS epoch for PoP authentication *) 178 + match info.tls_epoch with 179 + | Some epoch -> 180 + let challenge = Tls.Engine.export_key_material epoch label 32 in 181 + (* Use challenge for authentication *) 182 + | None -> 183 + failwith "TLS required for ACE authentication"; 184 + (* Use info.flow for MQTT communication *) 185 + Eio.Flow.copy_string data info.flow) 149 186 ]} *) 150 187 151 188 val with_connection : t -> Endpoint.t -> (connection -> 'a) -> 'a