diff --git a/cintegration/harness.c b/cintegration/harness.c index 9235a39..1972963 100644 --- a/cintegration/harness.c +++ b/cintegration/harness.c @@ -1056,6 +1056,137 @@ static void test_mock_disconnect(void) { mock_close(h); } +// PilotRecvFrom — exercises the datagram-receive happy path. +// +// The mock daemon reflects every cmdSendTo back to the same client as +// a cmdRecvFrom (loopback semantics). We send a datagram, then call +// PilotRecvFrom and verify the JSON envelope contains our payload and +// the synthetic src_port=0xDEAD that the mock injects. +static void test_mock_recv_from(void) { + uint64_t h = mock_connect_or_fail("mock_recv_from"); + if (!h) return; + + char addr[] = "1:0001.0002.0003:80"; + char data[] = "loopback-dg"; + char *send_err = PilotSendTo(h, addr, data, (int)strlen(data)); + if (send_err != NULL) { + FAIL("mock_recv_from", send_err); + free_c_string(send_err); + mock_close(h); + return; + } + + // The reflected cmdRecvFrom is server-pushed; it lands on the driver's + // dgCh asynchronously. PilotRecvFrom blocks on that channel. + char *res = PilotRecvFrom(h); + if (!has_no_error(res)) { + FAIL("mock_recv_from", res ? res : "null"); + if (res) free_c_string(res); + mock_close(h); + return; + } + + // Payload bytes are base64 over JSON; rather than decoding, just check + // for fields we know the mock fills in. src_port=0xDEAD=57005. + if (strstr(res, "\"src_port\":57005") == NULL) { + FAIL("mock_recv_from", "expected src_port=57005 (0xDEAD)"); + printf(" got: %s\n", res); + free_c_string(res); + mock_close(h); + return; + } + if (strstr(res, "\"dst_port\":80") == NULL) { + FAIL("mock_recv_from", "expected dst_port=80 echoed back"); + printf(" got: %s\n", res); + free_c_string(res); + mock_close(h); + return; + } + PASS("mock_recv_from"); + free_c_string(res); + mock_close(h); +} + +// PilotListenerAccept — exercises the listener-accept happy path with +// TWO concurrent client handles against the same mock daemon. +// +// Handle A: PilotListen(port=42017). The mock records the bind in its +// process-wide listenerRegistry. +// Handle B: PilotDial("...:42017"). The mock replies cmdDialOK to B AND +// pushes a cmdAccept frame onto A's socket. +// Handle A: PilotListenerAccept returns a new Conn — the accepted side. +// +// This is the only path that actually flows a cmdAccept through the +// driver's readLoop into a Listener's per-port channel — every other +// PilotListenerAccept callsite in this harness hits the error branch. +static void test_mock_listener_accept(void) { + const uint16_t port = 42017; + + uint64_t a = mock_connect_or_fail("mock_listener_accept_listen"); + if (!a) return; + uint64_t b = mock_connect_or_fail("mock_listener_accept_dial"); + if (!b) { + mock_close(a); + return; + } + + struct PilotListen_return ln = PilotListen(a, port); + if (ln.r0 == 0 || (ln.r1 && has_error(ln.r1))) { + FAIL("mock_listener_accept", ln.r1 ? ln.r1 : "listen failed"); + if (ln.r1) free_c_string(ln.r1); + mock_close(a); + mock_close(b); + return; + } + if (ln.r1) free_c_string(ln.r1); + + // Build the dial addr: "1:0001.0002.0003:42017". The mock parses the + // 6-byte addr + 2-byte port from the wire — the text form is only + // used by libpilot to construct the bytes. + char dial_addr[64]; + snprintf(dial_addr, sizeof(dial_addr), "1:0001.0002.0003:%u", + (unsigned)port); + struct PilotDial_return d = PilotDial(b, dial_addr); + if (d.r0 == 0 || (d.r1 && has_error(d.r1))) { + FAIL("mock_listener_accept", d.r1 ? d.r1 : "dial failed"); + if (d.r1) free_c_string(d.r1); + char *lc = PilotListenerClose(ln.r0); + if (lc) free_c_string(lc); + mock_close(a); + mock_close(b); + return; + } + if (d.r1) free_c_string(d.r1); + + // The dial triggers the mock to push cmdAccept onto handle A's socket. + // PilotListenerAccept blocks until that frame lands. + struct PilotListenerAccept_return acc = PilotListenerAccept(ln.r0); + if (acc.r0 == 0 || (acc.r1 && has_error(acc.r1))) { + FAIL("mock_listener_accept", acc.r1 ? acc.r1 : "accept failed"); + if (acc.r1) free_c_string(acc.r1); + char *cc = PilotConnClose(d.r0); + if (cc) free_c_string(cc); + char *lc = PilotListenerClose(ln.r0); + if (lc) free_c_string(lc); + mock_close(a); + mock_close(b); + return; + } + if (acc.r1) free_c_string(acc.r1); + PASS("mock_listener_accept"); + + // Tear down the accepted conn + the dialer conn + the listener. + char *ac = PilotConnClose(acc.r0); + if (ac) free_c_string(ac); + char *dc = PilotConnClose(d.r0); + if (dc) free_c_string(dc); + char *lc = PilotListenerClose(ln.r0); + if (lc) free_c_string(lc); + + mock_close(a); + mock_close(b); +} + // --------------------------------------------------------------------------- // Run all // --------------------------------------------------------------------------- @@ -1138,6 +1269,8 @@ int main(void) { test_mock_network_invite_polls(); test_mock_policy(); test_mock_member_tags(); + test_mock_recv_from(); + test_mock_listener_accept(); stop_mock_daemon(); } else { diff --git a/cintegration/mockdaemon/main.go b/cintegration/mockdaemon/main.go index 098cd34..1014239 100644 --- a/cintegration/mockdaemon/main.go +++ b/cintegration/mockdaemon/main.go @@ -18,9 +18,17 @@ // // Commands we respond to: // 0x01 Bind → 0x02 BindOK [port(2)] +// (also records the bound port in a per-client +// set + the process-wide listenerRegistry so +// a peer dial can drive cmdAccept) // 0x03 Dial → 0x04 DialOK [connID(4)] +// (if the dialed port has any listener in +// listenerRegistry, also pushes cmdAccept to +// that listener — covers PilotListenerAccept) // 0x06 Send → server-pushed 0x07 Recv echo // 0x08 Close → fire-and-forget; we silently drop +// 0x0B SendTo → server-pushed 0x0C RecvFrom loopback echo +// (covers PilotRecvFrom) // 0x0D Info → 0x0E InfoOK [JSON] // 0x0F Handshake → 0x10 HandshakeOK [JSON] (sub-cmd dispatched) // 0x11 ResolveHostname → 0x12 ResolveHostnameOK [JSON] @@ -34,7 +42,6 @@ // 0x23 Managed → 0x24 ManagedOK [JSON] // 0x25 RotateKey → 0x26 RotateKeyOK [JSON] // 0x29 Broadcast → 0x2A BroadcastOK [] -// 0x0B SendTo → fire-and-forget; we silently drop package main @@ -48,6 +55,7 @@ import ( "net" "os" "os/signal" + "sync" "sync/atomic" "syscall" ) @@ -97,9 +105,37 @@ const ( // MaxMessageSize matches internal/ipcutil/ipcutil.go. const maxMessageSize = 1 << 20 +// addrSize matches pkg/protocol.AddrSize — 6 bytes on the wire. +const addrSize = 6 + // connIDCounter doles out fake stream-conn IDs for cmdDial replies. var connIDCounter atomic.Uint32 +// listenerRegistry tracks bound listener ports across ALL connected +// clients. When client A calls Bind(port=N) and client B later calls +// Dial(addr:N), the mock looks up port N here and pushes cmdAccept on +// client A's socket. This is what exercises PilotListenerAccept end- +// to-end: bind on one handle, dial on a second handle, accept on the +// first. +// +// Concurrency: handleConn goroutines both read and write this map. A +// dial handler holds the listener's write mutex while pushing +// cmdAccept; the listener handler does the same while pushing +// CmdRecv/CmdCloseOK on its own conn — so we never interleave frames +// on a single socket. +var ( + listenerMu sync.Mutex + listenerRegistry = map[uint16]*clientConn{} +) + +// clientConn wraps a net.Conn with a write mutex so server-pushed +// frames from peer goroutines (a dial handler on a different connection) +// can't interleave with the conn's own reply frames. +type clientConn struct { + c net.Conn + writeMu sync.Mutex +} + func readFrame(r io.Reader) ([]byte, error) { var lenBuf [4]byte if _, err := io.ReadFull(r, lenBuf[:]); err != nil { @@ -116,35 +152,52 @@ func readFrame(r io.Reader) ([]byte, error) { return buf, nil } -func writeFrame(w io.Writer, cmd byte, payload []byte) error { +func (cc *clientConn) writeFrame(cmd byte, payload []byte) error { frame := make([]byte, 1+len(payload)) frame[0] = cmd copy(frame[1:], payload) var lenBuf [4]byte binary.BigEndian.PutUint32(lenBuf[:], uint32(len(frame))) - if _, err := w.Write(lenBuf[:]); err != nil { + + cc.writeMu.Lock() + defer cc.writeMu.Unlock() + if _, err := cc.c.Write(lenBuf[:]); err != nil { return err } - _, err := w.Write(frame) + _, err := cc.c.Write(frame) return err } -func writeJSON(w io.Writer, cmd byte, v interface{}) error { +func (cc *clientConn) writeJSON(cmd byte, v interface{}) error { data, err := json.Marshal(v) if err != nil { return err } - return writeFrame(w, cmd, data) + return cc.writeFrame(cmd, data) } // handleConn services one driver connection. -func handleConn(c net.Conn) { - defer c.Close() - log.Printf("mock: client connected from %s", c.RemoteAddr()) +func handleConn(raw net.Conn) { + cc := &clientConn{c: raw} + defer raw.Close() + log.Printf("mock: client connected from %s", raw.RemoteAddr()) + + // Ports this connection has bound. Used to clean up listenerRegistry + // on disconnect so a stale closed conn never gets pushed to. + boundPorts := map[uint16]struct{}{} + defer func() { + listenerMu.Lock() + for p := range boundPorts { + if existing, ok := listenerRegistry[p]; ok && existing == cc { + delete(listenerRegistry, p) + } + } + listenerMu.Unlock() + }() for { - msg, err := readFrame(c) + msg, err := readFrame(raw) if err != nil { if err != io.EOF { log.Printf("mock: read frame: %v", err) @@ -160,7 +213,7 @@ func handleConn(c net.Conn) { switch cmd { case cmdInfo: - _ = writeJSON(c, cmdInfoOK, map[string]interface{}{ + _ = cc.writeJSON(cmdInfoOK, map[string]interface{}{ "node_id": uint32(0x12345678), "hostname": "mock-daemon", "version": "mock-0.1.0", @@ -168,31 +221,67 @@ func handleConn(c net.Conn) { }) case cmdHealth: - _ = writeJSON(c, cmdHealthOK, map[string]interface{}{ + _ = cc.writeJSON(cmdHealthOK, map[string]interface{}{ "ok": true, "uptime_secs": 0, }) case cmdBind: if len(payload) < 2 { - sendError(c, "bind: missing port") + sendError(cc, "bind: missing port") continue } port := binary.BigEndian.Uint16(payload[0:2]) respBody := make([]byte, 2) binary.BigEndian.PutUint16(respBody[0:2], port) - _ = writeFrame(c, cmdBindOK, respBody) + _ = cc.writeFrame(cmdBindOK, respBody) + + // Record this port as bound by this client. Drives the + // cmdAccept path when a peer dials the port. Last-bind- + // wins on collisions (harness shouldn't collide). + listenerMu.Lock() + listenerRegistry[port] = cc + listenerMu.Unlock() + boundPorts[port] = struct{}{} case cmdDial: // payload: [Addr(6)][port(2)] - if len(payload) < 8 { - sendError(c, "dial: missing address/port") + if len(payload) < addrSize+2 { + sendError(cc, "dial: missing address/port") continue } + dstPort := binary.BigEndian.Uint16(payload[addrSize : addrSize+2]) connID := connIDCounter.Add(1) + respBody := make([]byte, 4) binary.BigEndian.PutUint32(respBody[0:4], connID) - _ = writeFrame(c, cmdDialOK, respBody) + _ = cc.writeFrame(cmdDialOK, respBody) + + // If anyone has bound dstPort, push a cmdAccept frame on + // that listener's socket. Format: + // [port(2)][connID(4)][remoteAddr(6)][remotePort(2)] + // remoteAddr/remotePort identify the dialer; for the mock + // we use a canned synthetic peer address (the listener's + // driver code only parses the bytes, doesn't validate them + // against any directory). connID is the SAME one we just + // handed to the dialer — in a real daemon, the listener's + // accepted Conn and the dialer's Conn share an end-to-end + // stream identifier; reusing it here keeps the harness + // simple. The peer driver code keeps its own handle table + // keyed by connID so there's no collision in practice. + listenerMu.Lock() + listener, ok := listenerRegistry[dstPort] + listenerMu.Unlock() + if ok && listener != nil { + acceptBody := make([]byte, 2+4+addrSize+2) + binary.BigEndian.PutUint16(acceptBody[0:2], dstPort) + binary.BigEndian.PutUint32(acceptBody[2:6], connID) + // Synthetic remote addr 1:0001.AAAA.BBBB + binary.BigEndian.PutUint16(acceptBody[6:8], 0x0001) + binary.BigEndian.PutUint32(acceptBody[8:12], 0xAAAABBBB) + binary.BigEndian.PutUint16(acceptBody[12:14], 0xDEAD) + _ = listener.writeFrame(cmdAccept, acceptBody) + } case cmdSend: // payload: [connID(4)][data...] @@ -206,7 +295,7 @@ func handleConn(c net.Conn) { recvBody := make([]byte, 4+len(data)) binary.BigEndian.PutUint32(recvBody[0:4], connID) copy(recvBody[4:], data) - _ = writeFrame(c, cmdRecv, recvBody) + _ = cc.writeFrame(cmdRecv, recvBody) case cmdClose: // Fire-and-forget; driver does not wait for a reply on Close. @@ -215,100 +304,123 @@ func handleConn(c net.Conn) { if len(payload) >= 4 { respBody := make([]byte, 4) copy(respBody, payload[0:4]) - _ = writeFrame(c, cmdCloseOK, respBody) + _ = cc.writeFrame(cmdCloseOK, respBody) } case cmdSendTo: - // Fire-and-forget datagram; nothing to echo without a peer. + // Datagram loopback: synthesize a cmdRecvFrom back to the + // sender so PilotRecvFrom on the same handle observes it. + // + // cmdSendTo payload: [dstAddr(6)][dstPort(2)][data...] + // cmdRecvFrom payload: [srcAddr(6)][srcPort(2)][dstPort(2)][data...] + // + // We treat the original destination as the "source" of the + // reflected datagram (loopback semantics) and assign a + // canned srcPort of 0xDEAD. The driver doesn't validate + // addresses against a directory at this layer, so any + // well-formed 6-byte addr is fine. + if len(payload) < addrSize+2 { + continue + } + dstAddr := payload[0:addrSize] + dstPort := binary.BigEndian.Uint16(payload[addrSize : addrSize+2]) + data := payload[addrSize+2:] + + recvBody := make([]byte, addrSize+2+2+len(data)) + copy(recvBody[0:addrSize], dstAddr) // srcAddr := original dst + binary.BigEndian.PutUint16(recvBody[addrSize:addrSize+2], 0xDEAD) + binary.BigEndian.PutUint16(recvBody[addrSize+2:addrSize+4], dstPort) + copy(recvBody[addrSize+4:], data) + _ = cc.writeFrame(cmdRecvFrom, recvBody) case cmdHandshake: // payload: [subCmd(1)][rest...] if len(payload) < 1 { - sendError(c, "handshake: missing sub-cmd") + sendError(cc, "handshake: missing sub-cmd") continue } sub := payload[0] switch sub { case 0x01, 0x02, 0x03, 0x06, 0x07: // send/approve/reject/revoke/wait - _ = writeJSON(c, cmdHandshakeOK, map[string]interface{}{ + _ = cc.writeJSON(cmdHandshakeOK, map[string]interface{}{ "ok": true, }) case 0x04: // pending - _ = writeJSON(c, cmdHandshakeOK, map[string]interface{}{ + _ = cc.writeJSON(cmdHandshakeOK, map[string]interface{}{ "pending": []interface{}{}, }) case 0x05: // trusted - _ = writeJSON(c, cmdHandshakeOK, map[string]interface{}{ + _ = cc.writeJSON(cmdHandshakeOK, map[string]interface{}{ "trusted": []interface{}{}, }) default: - sendError(c, fmt.Sprintf("handshake: unknown sub 0x%02X", sub)) + sendError(cc, fmt.Sprintf("handshake: unknown sub 0x%02X", sub)) } case cmdResolveHostname: - _ = writeJSON(c, cmdResolveHostnameOK, map[string]interface{}{ + _ = cc.writeJSON(cmdResolveHostnameOK, map[string]interface{}{ "hostname": string(payload), "node_id": uint32(0x0BADF00D), }) case cmdSetHostname: - _ = writeJSON(c, cmdSetHostnameOK, map[string]interface{}{ + _ = cc.writeJSON(cmdSetHostnameOK, map[string]interface{}{ "hostname": string(payload), }) case cmdSetVisibility: public := len(payload) >= 1 && payload[0] == 1 - _ = writeJSON(c, cmdSetVisibilityOK, map[string]interface{}{ + _ = cc.writeJSON(cmdSetVisibilityOK, map[string]interface{}{ "public": public, }) case cmdDeregister: - _ = writeJSON(c, cmdDeregisterOK, map[string]interface{}{"ok": true}) + _ = cc.writeJSON(cmdDeregisterOK, map[string]interface{}{"ok": true}) case cmdSetTags: - _ = writeJSON(c, cmdSetTagsOK, map[string]interface{}{"ok": true}) + _ = cc.writeJSON(cmdSetTagsOK, map[string]interface{}{"ok": true}) case cmdSetWebhook: - _ = writeJSON(c, cmdSetWebhookOK, map[string]interface{}{ + _ = cc.writeJSON(cmdSetWebhookOK, map[string]interface{}{ "url": string(payload), }) case cmdNetwork: // payload: [subCmd(1)][rest...] — all variants reply with // a canned OK map. - _ = writeJSON(c, cmdNetworkOK, map[string]interface{}{ + _ = cc.writeJSON(cmdNetworkOK, map[string]interface{}{ "ok": true, "members": []interface{}{}, }) case cmdManaged: - _ = writeJSON(c, cmdManagedOK, map[string]interface{}{ + _ = cc.writeJSON(cmdManagedOK, map[string]interface{}{ "ok": true, "status": "idle", }) case cmdRotateKey: - _ = writeJSON(c, cmdRotateKeyOK, map[string]interface{}{ + _ = cc.writeJSON(cmdRotateKeyOK, map[string]interface{}{ "rotated": true, }) case cmdBroadcast: - _ = writeFrame(c, cmdBroadcastOK, nil) + _ = cc.writeFrame(cmdBroadcastOK, nil) default: log.Printf("mock: unhandled cmd 0x%02X", cmd) - sendError(c, fmt.Sprintf("unknown cmd 0x%02X", cmd)) + sendError(cc, fmt.Sprintf("unknown cmd 0x%02X", cmd)) } } } -func sendError(c net.Conn, msg string) { +func sendError(cc *clientConn, msg string) { // CmdError payload format per pkg/daemon/ipc.go sendError: [code(2)][msg]. body := make([]byte, 2+len(msg)) body[0] = 0x00 body[1] = 0x01 copy(body[2:], msg) - _ = writeFrame(c, cmdError, body) + _ = cc.writeFrame(cmdError, body) } func main() {