Skip to content

Commit 4f1a9c9

Browse files
authored
Create test_diagnostics.py
1 parent 5a404cd commit 4f1a9c9

1 file changed

Lines changed: 284 additions & 0 deletions

File tree

tests/test_diagnostics.py

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
from __future__ import annotations
2+
3+
import hashlib
4+
from pathlib import Path
5+
6+
import pytest
7+
8+
import ix_operator.crypto.native as native_module
9+
from ix_operator import (
10+
LocalTransportHub,
11+
OperatorApplication,
12+
OperatorNetwork,
13+
OperatorNode,
14+
PacketCodec,
15+
SessionMaterial,
16+
SessionService,
17+
)
18+
19+
20+
class FakeNativeModule:
21+
TAG_LEN = 16
22+
23+
def generate_ed25519_keypair_py(self) -> tuple[bytes, bytes]:
24+
return b"s" * 32, b"S" * 32
25+
26+
def generate_x25519_keypair_py(self) -> tuple[bytes, bytes]:
27+
return b"x" * 32, b"X" * 32
28+
29+
def random_bytes(self, length: int) -> bytes:
30+
return bytes((index % 251 for index in range(length)))
31+
32+
def random_nonce(self) -> bytes:
33+
return b"n" * 12
34+
35+
def sign_ed25519_py(self, private_key: bytes, message: bytes) -> bytes:
36+
return hashlib.sha512(private_key + message).digest()
37+
38+
def verify_ed25519_py(self, public_key: bytes, message: bytes, signature: bytes) -> bool:
39+
expected = hashlib.sha512(public_key + message).digest()
40+
return expected == signature
41+
42+
def x25519_shared_secret_py(self, private_key: bytes, peer_public_key: bytes) -> bytes:
43+
ordered = sorted([private_key, peer_public_key])
44+
return hashlib.sha256(ordered[0] + ordered[1]).digest()
45+
46+
def derive_session_keys(
47+
self,
48+
shared_secret: bytes,
49+
salt: bytes | None,
50+
info: bytes,
51+
) -> tuple[bytes, bytes]:
52+
normalized_salt = salt or b""
53+
encryption_key = hashlib.sha256(shared_secret + normalized_salt + info + b"enc").digest()
54+
authentication_key = hashlib.sha256(
55+
shared_secret + normalized_salt + info + b"auth"
56+
).digest()
57+
return encryption_key, authentication_key
58+
59+
def encrypt_aes256_gcm_py(
60+
self,
61+
key: bytes,
62+
nonce: bytes,
63+
plaintext: bytes,
64+
aad: bytes,
65+
) -> bytes:
66+
return self._seal(key, nonce, plaintext, aad)
67+
68+
def decrypt_aes256_gcm_py(
69+
self,
70+
key: bytes,
71+
nonce: bytes,
72+
ciphertext: bytes,
73+
aad: bytes,
74+
) -> bytes:
75+
return self._open(key, nonce, ciphertext, aad)
76+
77+
def encrypt_chacha20_poly1305_py(
78+
self,
79+
key: bytes,
80+
nonce: bytes,
81+
plaintext: bytes,
82+
aad: bytes,
83+
) -> bytes:
84+
return self._seal(key, nonce, plaintext, aad)
85+
86+
def decrypt_chacha20_poly1305_py(
87+
self,
88+
key: bytes,
89+
nonce: bytes,
90+
ciphertext: bytes,
91+
aad: bytes,
92+
) -> bytes:
93+
return self._open(key, nonce, ciphertext, aad)
94+
95+
def _seal(self, key: bytes, nonce: bytes, plaintext: bytes, aad: bytes) -> bytes:
96+
keystream = hashlib.sha256(key + nonce).digest()
97+
body = bytes(
98+
byte ^ keystream[index % len(keystream)]
99+
for index, byte in enumerate(plaintext)
100+
)
101+
tag = hashlib.sha256(key + nonce + aad + plaintext).digest()[: self.TAG_LEN]
102+
return body + tag
103+
104+
def _open(self, key: bytes, nonce: bytes, ciphertext: bytes, aad: bytes) -> bytes:
105+
if len(ciphertext) < self.TAG_LEN:
106+
raise ValueError("ciphertext too short")
107+
108+
body = ciphertext[: -self.TAG_LEN]
109+
tag = ciphertext[-self.TAG_LEN :]
110+
111+
keystream = hashlib.sha256(key + nonce).digest()
112+
plaintext = bytes(
113+
byte ^ keystream[index % len(keystream)]
114+
for index, byte in enumerate(body)
115+
)
116+
expected_tag = hashlib.sha256(key + nonce + aad + plaintext).digest()[: self.TAG_LEN]
117+
if expected_tag != tag:
118+
raise ValueError("integrity check failed")
119+
120+
return plaintext
121+
122+
123+
class FakeHandshakeCryptoBackend:
124+
def __init__(self) -> None:
125+
self._counter = 0
126+
127+
def random_bytes(self, length: int) -> bytes:
128+
self._counter += 1
129+
seed = f"rng-{self._counter}".encode("utf-8")
130+
output = b""
131+
while len(output) < length:
132+
seed = hashlib.sha256(seed).digest()
133+
output += seed
134+
return output[:length]
135+
136+
def sign(self, private_key: bytes, message: bytes) -> bytes:
137+
return hashlib.sha512(private_key + message).digest()
138+
139+
def verify(self, public_key: bytes, message: bytes, signature: bytes) -> bool:
140+
expected = hashlib.sha512(public_key + message).digest()
141+
return expected == signature
142+
143+
def shared_secret(self, private_key: bytes, peer_public_key: bytes) -> bytes:
144+
ordered = sorted([private_key, peer_public_key])
145+
return hashlib.sha256(ordered[0] + ordered[1]).digest()
146+
147+
def derive_material(self, shared_secret: bytes, transcript_hash: bytes) -> SessionMaterial:
148+
encryption_key = hashlib.sha256(shared_secret + b"enc" + transcript_hash).digest()
149+
authentication_key = hashlib.sha256(shared_secret + b"auth" + transcript_hash).digest()
150+
return SessionMaterial(
151+
encryption_key=encryption_key,
152+
authentication_key=authentication_key,
153+
transcript_hash=transcript_hash,
154+
)
155+
156+
157+
class FakeTransportCryptoBackend:
158+
TAG_LEN = 16
159+
160+
def __init__(self) -> None:
161+
self._counter = 0
162+
163+
def random_nonce(self, length: int) -> bytes:
164+
self._counter += 1
165+
seed = hashlib.sha256(f"nonce-{self._counter}".encode("utf-8")).digest()
166+
return seed[:length]
167+
168+
def ciphertext_length(self, plaintext_length: int) -> int:
169+
return plaintext_length + self.TAG_LEN
170+
171+
def encrypt(self, key: bytes, nonce: bytes, plaintext: bytes, aad: bytes) -> bytes:
172+
keystream = hashlib.sha256(key + nonce).digest()
173+
body = bytes(
174+
byte ^ keystream[index % len(keystream)]
175+
for index, byte in enumerate(plaintext)
176+
)
177+
tag = hashlib.sha256(key + nonce + aad + plaintext).digest()[: self.TAG_LEN]
178+
return body + tag
179+
180+
def decrypt(self, key: bytes, nonce: bytes, ciphertext: bytes, aad: bytes) -> bytes:
181+
if len(ciphertext) < self.TAG_LEN:
182+
raise ValueError("ciphertext too short")
183+
184+
body = ciphertext[: -self.TAG_LEN]
185+
tag = ciphertext[-self.TAG_LEN :]
186+
187+
keystream = hashlib.sha256(key + nonce).digest()
188+
plaintext = bytes(
189+
byte ^ keystream[index % len(keystream)]
190+
for index, byte in enumerate(body)
191+
)
192+
expected_tag = hashlib.sha256(key + nonce + aad + plaintext).digest()[: self.TAG_LEN]
193+
if expected_tag != tag:
194+
raise ValueError("integrity check failed")
195+
196+
return plaintext
197+
198+
199+
def test_application_status_snapshot_reports_identity_state(
200+
monkeypatch: pytest.MonkeyPatch,
201+
tmp_path: Path,
202+
) -> None:
203+
monkeypatch.setattr(native_module, "_native", FakeNativeModule())
204+
monkeypatch.setenv("IX_OPERATOR_RUNTIME_DIR", str(tmp_path / "runtime"))
205+
206+
app = OperatorApplication.from_env()
207+
before = app.status_snapshot()
208+
209+
assert before.identity_exists is False
210+
assert before.local_peer_id is None
211+
212+
app.initialize_identity(peer_id="node-alpha")
213+
after = app.status_snapshot()
214+
215+
assert after.identity_exists is True
216+
assert after.local_peer_id == "node-alpha"
217+
assert after.native_extension_available is True
218+
219+
220+
def test_node_snapshot_reports_agents_and_channels() -> None:
221+
hub = LocalTransportHub()
222+
service = SessionService(FakeHandshakeCryptoBackend())
223+
224+
node = OperatorNode.create(
225+
peer_id="node-alpha",
226+
signing_public_key=b"A" * 32,
227+
exchange_public_key=b"B" * 32,
228+
signing_private_key=b"A" * 32,
229+
exchange_private_key=b"B" * 32,
230+
hub=hub,
231+
session_service=service,
232+
codec=PacketCodec(FakeTransportCryptoBackend()),
233+
)
234+
235+
node.boot_program(
236+
"""
237+
agent genesis_i "Genesis I"
238+
goal "observe"
239+
say "ready"
240+
"""
241+
)
242+
243+
snapshot = node.snapshot()
244+
245+
assert snapshot.peer_id == "node-alpha"
246+
assert snapshot.channel_peers == ()
247+
assert snapshot.registered_agents == ("genesis_i",)
248+
assert snapshot.active_agent_count == 0
249+
250+
251+
def test_network_snapshot_reports_all_nodes() -> None:
252+
network = OperatorNetwork.local(
253+
session_service=SessionService(FakeHandshakeCryptoBackend()),
254+
codec_factory=lambda: PacketCodec(FakeTransportCryptoBackend()),
255+
)
256+
257+
from ix_operator import NodeIdentity
258+
259+
network.add_node(
260+
NodeIdentity(
261+
peer_id="node-alpha",
262+
signing_public_key=b"A" * 32,
263+
exchange_public_key=b"B" * 32,
264+
signing_private_key=b"A" * 32,
265+
exchange_private_key=b"B" * 32,
266+
)
267+
)
268+
network.add_node(
269+
NodeIdentity(
270+
peer_id="node-beta",
271+
signing_public_key=b"C" * 32,
272+
exchange_public_key=b"D" * 32,
273+
signing_private_key=b"C" * 32,
274+
exchange_private_key=b"D" * 32,
275+
)
276+
)
277+
278+
snapshot = network.snapshot()
279+
280+
assert snapshot.peer_ids == ("node-alpha", "node-beta")
281+
assert tuple(item.peer_id for item in snapshot.node_snapshots) == (
282+
"node-alpha",
283+
"node-beta",
284+
)

0 commit comments

Comments
 (0)