Skip to content

Commit ae57fa0

Browse files
committed
CM-61986-add-mcp-and-email-enrichment-from-claude-json
1 parent 49ec713 commit ae57fa0

File tree

5 files changed

+239
-1
lines changed

5 files changed

+239
-1
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Reader for ~/.claude.json configuration file.
2+
3+
Extracts user email and MCP server configuration from the Claude Code
4+
global config file for use in AI guardrails scan enrichment.
5+
"""
6+
7+
import json
8+
from pathlib import Path
9+
from typing import Optional
10+
11+
from cycode.logger import get_logger
12+
13+
logger = get_logger('AI Guardrails Claude Config')
14+
15+
_CLAUDE_CONFIG_PATH = Path.home() / '.claude.json'
16+
17+
_SERVER_TYPE_MAPPING = {
18+
'stdio': 'Local',
19+
'http': 'Remote',
20+
}
21+
22+
23+
def load_claude_config(config_path: Optional[Path] = None) -> Optional[dict]:
24+
"""Load and parse ~/.claude.json.
25+
26+
Args:
27+
config_path: Override path for testing. Defaults to ~/.claude.json.
28+
29+
Returns:
30+
Parsed dict or None if file is missing or invalid.
31+
"""
32+
path = config_path or _CLAUDE_CONFIG_PATH
33+
if not path.exists():
34+
logger.debug('Claude config file not found', extra={'path': str(path)})
35+
return None
36+
try:
37+
content = path.read_text(encoding='utf-8')
38+
return json.loads(content)
39+
except Exception as e:
40+
logger.debug('Failed to load Claude config file', exc_info=e)
41+
return None
42+
43+
44+
def get_user_email(config: dict) -> Optional[str]:
45+
"""Extract user email from Claude config.
46+
47+
Reads oauthAccount.emailAddress from the config dict.
48+
"""
49+
return config.get('oauthAccount', {}).get('emailAddress')
50+
51+
52+
def get_mcp_servers(config: dict) -> list[dict]:
53+
"""Extract top-level MCP servers from Claude config and map to backend DTO shape.
54+
55+
Reads the top-level mcpServers dict and transforms each entry to match
56+
the McpServerPayloadDTO expected by ai-security-manager:
57+
- name: server key name
58+
- server_type: "Local" (stdio) or "Remote" (http)
59+
- command: executable command (stdio servers)
60+
- args: command arguments (stdio servers)
61+
- url: server URL (http servers)
62+
63+
Returns:
64+
List of dicts matching McpServerPayloadDTO shape. Empty list if none found.
65+
"""
66+
mcp_servers = config.get('mcpServers', {})
67+
if not isinstance(mcp_servers, dict):
68+
return []
69+
70+
result = []
71+
for name, server_config in mcp_servers.items():
72+
if not isinstance(server_config, dict):
73+
continue
74+
75+
raw_type = server_config.get('type', '')
76+
server_type = _SERVER_TYPE_MAPPING.get(raw_type, raw_type)
77+
78+
result.append({
79+
'name': name,
80+
'server_type': server_type,
81+
'command': server_config.get('command'),
82+
'args': server_config.get('args'),
83+
'url': server_config.get('url'),
84+
})
85+
86+
return result

cycode/cli/apps/ai_guardrails/scan/payload.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import Optional
88

99
from cycode.cli.apps.ai_guardrails.consts import AIIDEType
10+
from cycode.cli.apps.ai_guardrails.scan.claude_config import get_user_email, load_claude_config
1011
from cycode.cli.apps.ai_guardrails.scan.types import (
1112
CLAUDE_CODE_EVENT_MAPPING,
1213
CLAUDE_CODE_EVENT_NAMES,
@@ -207,11 +208,15 @@ def from_claude_code_payload(cls, payload: dict) -> 'AIHookPayload':
207208
# Extract IDE version, model, and generation ID from transcript file
208209
ide_version, model, generation_id = _extract_from_claude_transcript(payload.get('transcript_path'))
209210

211+
# Extract user email from ~/.claude.json
212+
claude_config = load_claude_config()
213+
ide_user_email = get_user_email(claude_config) if claude_config else None
214+
210215
return cls(
211216
event_name=canonical_event,
212217
conversation_id=payload.get('session_id'),
213218
generation_id=generation_id,
214-
ide_user_email=None, # Claude Code doesn't provide this in hook payload
219+
ide_user_email=ide_user_email,
215220
model=model,
216221
ide_provider=AIIDEType.CLAUDE_CODE.value,
217222
ide_version=ide_version,

cycode/cli/apps/ai_guardrails/scan/scan_command.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import typer
1818

1919
from cycode.cli.apps.ai_guardrails.consts import AIIDEType
20+
from cycode.cli.apps.ai_guardrails.scan.claude_config import get_mcp_servers, load_claude_config
2021
from cycode.cli.apps.ai_guardrails.scan.handlers import get_handler_for_event
2122
from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload
2223
from cycode.cli.apps.ai_guardrails.scan.policy import load_policy
@@ -114,6 +115,16 @@ def scan_command(
114115
try:
115116
_initialize_clients(ctx)
116117

118+
if tool == AIIDEType.CLAUDE_CODE:
119+
try:
120+
claude_config = load_claude_config()
121+
if claude_config:
122+
mcp_servers = get_mcp_servers(claude_config)
123+
ai_security_client = ctx.obj['ai_security_client']
124+
ai_security_client.report_mcp_servers(mcp_servers)
125+
except Exception as e:
126+
logger.debug('Failed to report MCP servers from Claude config', exc_info=e)
127+
117128
handler = get_handler_for_event(event_name)
118129
if handler is None:
119130
logger.debug('Unknown hook event, allowing by default', extra={'event_name': event_name})

cycode/cyclient/ai_security_manager_client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class AISecurityManagerClient:
1717

1818
_CONVERSATIONS_PATH = 'v4/ai-security/interactions/conversations'
1919
_EVENTS_PATH = 'v4/ai-security/interactions/events'
20+
_MCP_SERVERS_PATH = 'v4/ai-security/mcp-servers'
2021

2122
def __init__(self, client: CycodeClientBase, service_config: 'AISecurityManagerServiceConfigBase') -> None:
2223
self.client = client
@@ -54,6 +55,22 @@ def create_conversation(self, payload: 'AIHookPayload') -> Optional[str]:
5455

5556
return conversation_id
5657

58+
def report_mcp_servers(self, mcp_servers: list[dict]) -> None:
59+
"""Report MCP servers discovered from IDE config.
60+
61+
Posts the list of MCP server configurations to the backend.
62+
Failures are logged but do not block the hook flow.
63+
"""
64+
if not mcp_servers:
65+
return
66+
67+
body = {'mcp_servers': mcp_servers}
68+
69+
try:
70+
self.client.post(self._build_endpoint_path(self._MCP_SERVERS_PATH), body=body)
71+
except Exception as e:
72+
logger.debug('Failed to report MCP servers', exc_info=e)
73+
5774
def create_event(
5875
self,
5976
payload: 'AIHookPayload',
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
"""Tests for Claude Code config file reader."""
2+
3+
import json
4+
from pathlib import Path
5+
6+
from pyfakefs.fake_filesystem import FakeFilesystem
7+
8+
from cycode.cli.apps.ai_guardrails.scan.claude_config import get_mcp_servers, get_user_email, load_claude_config
9+
10+
11+
def test_load_claude_config_valid(fs: FakeFilesystem) -> None:
12+
"""Test loading a valid ~/.claude.json file."""
13+
config = {'oauthAccount': {'emailAddress': 'user@example.com'}}
14+
config_path = Path.home() / '.claude.json'
15+
fs.create_file(config_path, contents=json.dumps(config))
16+
17+
result = load_claude_config(config_path)
18+
assert result == config
19+
20+
21+
def test_load_claude_config_missing_file(fs: FakeFilesystem) -> None:
22+
"""Test loading when ~/.claude.json does not exist."""
23+
fs.create_dir(Path.home())
24+
config_path = Path.home() / '.claude.json'
25+
26+
result = load_claude_config(config_path)
27+
assert result is None
28+
29+
30+
def test_load_claude_config_corrupt_file(fs: FakeFilesystem) -> None:
31+
"""Test loading when ~/.claude.json contains invalid JSON."""
32+
config_path = Path.home() / '.claude.json'
33+
fs.create_file(config_path, contents='not valid json {{{')
34+
35+
result = load_claude_config(config_path)
36+
assert result is None
37+
38+
39+
def test_get_user_email_present() -> None:
40+
"""Test extracting email when oauthAccount.emailAddress exists."""
41+
config = {'oauthAccount': {'emailAddress': 'user@example.com'}}
42+
assert get_user_email(config) == 'user@example.com'
43+
44+
45+
def test_get_user_email_missing_oauth_account() -> None:
46+
"""Test extracting email when oauthAccount key is missing."""
47+
config = {'someOtherKey': 'value'}
48+
assert get_user_email(config) is None
49+
50+
51+
def test_get_user_email_missing_email_address() -> None:
52+
"""Test extracting email when oauthAccount exists but emailAddress is missing."""
53+
config = {'oauthAccount': {'someOtherField': 'value'}}
54+
assert get_user_email(config) is None
55+
56+
57+
def test_get_mcp_servers_stdio_and_http() -> None:
58+
"""Test extracting MCP servers with both stdio and http types."""
59+
config = {
60+
'mcpServers': {
61+
'gitlab': {
62+
'type': 'stdio',
63+
'command': '/opt/homebrew/bin/gitlab-mcp',
64+
'args': ['--verbose'],
65+
},
66+
'atlassian': {
67+
'type': 'http',
68+
'url': 'https://mcp.atlassian.com/v1/mcp',
69+
},
70+
},
71+
}
72+
73+
result = get_mcp_servers(config)
74+
assert len(result) == 2
75+
76+
gitlab = next(s for s in result if s['name'] == 'gitlab')
77+
assert gitlab['server_type'] == 'Local'
78+
assert gitlab['command'] == '/opt/homebrew/bin/gitlab-mcp'
79+
assert gitlab['args'] == ['--verbose']
80+
assert gitlab['url'] is None
81+
82+
atlassian = next(s for s in result if s['name'] == 'atlassian')
83+
assert atlassian['server_type'] == 'Remote'
84+
assert atlassian['command'] is None
85+
assert atlassian['url'] == 'https://mcp.atlassian.com/v1/mcp'
86+
87+
88+
def test_get_mcp_servers_empty() -> None:
89+
"""Test extracting MCP servers when mcpServers is empty."""
90+
config = {'mcpServers': {}}
91+
assert get_mcp_servers(config) == []
92+
93+
94+
def test_get_mcp_servers_missing_key() -> None:
95+
"""Test extracting MCP servers when mcpServers key is missing."""
96+
config = {'someOtherKey': 'value'}
97+
assert get_mcp_servers(config) == []
98+
99+
100+
def test_get_mcp_servers_invalid_type() -> None:
101+
"""Test extracting MCP servers when mcpServers is not a dict."""
102+
config = {'mcpServers': 'not a dict'}
103+
assert get_mcp_servers(config) == []
104+
105+
106+
def test_get_mcp_servers_unknown_server_type() -> None:
107+
"""Test that unknown server types are passed through as-is."""
108+
config = {
109+
'mcpServers': {
110+
'custom': {
111+
'type': 'sse',
112+
'url': 'https://example.com/sse',
113+
},
114+
},
115+
}
116+
117+
result = get_mcp_servers(config)
118+
assert len(result) == 1
119+
assert result[0]['server_type'] == 'sse'

0 commit comments

Comments
 (0)