-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathpc.py
More file actions
408 lines (338 loc) · 13.2 KB
/
pc.py
File metadata and controls
408 lines (338 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
"""
PC 算法实现模块 (PC Algorithm Implementation)
本模块实现了 PC (Peter-Clark) 算法,用于从观测数据中发现因果结构。
PC 算法是因果发现领域最经典和广泛使用的算法之一。
算法流程:
1. 初始化完全无向图(所有变量两两相连)
2. 骨架发现(Skeleton Discovery):
- 遍历所有边 (X, Y)
- 对于每条边,搜索条件集 S 使得 X ⊥ Y | S
- 如果找到这样的 S,则删除边 X-Y
3. 方向确定(Edge Orientation):
- 识别 V-structures (碰撞器)
- 应用 Meek 规则传播方向
参考文献:
Spirtes, P., Glymour, C., & Scheines, R. (2000).
Causation, Prediction, and Search (2nd ed.). MIT Press.
作者: py_pcalg contributors
"""
import sys
import numpy as np
import random
from math import sqrt, log
from typing import List, Optional, Tuple, Dict, Union, Callable, Any
import pandas as pd
from utils import (
Matrix,
get_next_set,
getNextSet, # 向后兼容
independence_test,
indTest, # 向后兼容
partial_correlation,
z_statistic,
pseudoinverse
)
from graph import visualize_graph, grapher
def skeleton(
suff_stat: List,
indep_test: Callable,
alpha: float,
labels: List,
fixed_gaps: Optional[np.ndarray] = None,
fixed_edges: Optional[np.ndarray] = None,
na_delete: bool = True,
m_max: float = float('Inf'),
u2pd: Tuple[str, ...] = ("relaxed", "rand", "retry"),
solve_confl: bool = False,
num_cores: int = 1,
verbose: bool = False
) -> Matrix:
"""
PC 算法的骨架发现阶段
从完全图开始,通过条件独立性检验逐步删除边,
得到因果图的骨架结构(无向图)。
Args:
suff_stat: 充分统计量,通常是 [相关矩阵, 样本数量]
indep_test: 条件独立性检验函数,签名为 f(x, y, S, suff_stat) -> p_value
alpha: 显著性水平,p > alpha 时认为条件独立
labels: 变量标签列表
fixed_gaps: 固定不存在的边(可选),矩阵形式
fixed_edges: 固定存在的边(可选),矩阵形式
na_delete: 是否在 p 值缺失时删除边
m_max: 最大条件集大小
u2pd: 处理无向边的策略 ("relaxed", "rand", "retry")
solve_confl: 是否解决冲突
num_cores: 并行计算核心数(当前未实现)
verbose: 是否输出详细信息
Returns:
Matrix: 邻接矩阵,表示发现的骨架结构
Raises:
Exception: 如果未指定 labels 参数
Example:
>>> import pandas as pd
>>> import numpy as np
>>> # 准备数据
>>> data = pd.read_csv('data.csv')
>>> corr = data.corr()
>>> n = len(data)
>>> labels = list(range(data.shape[1]))
>>> # 运行骨架发现
>>> skel = skeleton([corr, n], indTest, alpha=0.05, labels=labels)
>>> print(skel.M) # 打印邻接矩阵
Note:
这是 PC 算法的核心步骤。算法复杂度取决于图的稀疏程度和
最大邻居数量。在稀疏图上效率很高。
"""
# 验证输入
try:
p = len(labels)
except TypeError:
raise Exception("必须指定 'labels' 参数!")
seq_p = list(range(p)) # 变量索引序列
# 初始化邻接矩阵
if fixed_gaps is None:
# 初始化为完全图(所有变量两两相连)
graph_matrix = np.ones(shape=(p, p))
else:
# 验证固定间隙矩阵
if fixed_gaps.shape != (p, p):
raise Exception("fixed_gaps 的维度与数据集不匹配")
if not np.allclose(fixed_gaps, fixed_gaps.T):
raise Exception("fixed_gaps 必须是对称矩阵")
graph_matrix = np.zeros(shape=(p, p))
# 包装为 Matrix 对象
graph_matrix = Matrix(graph_matrix)
graph_matrix.diag(0) # 对角线设为 0(节点不与自身相连)
# 初始化固定边矩阵
if fixed_edges is None:
fixed_edges = np.zeros(shape=(p, p))
# 初始化分离集和最大 p 值矩阵
sepset = [[None] * p for _ in range(p)] # 分离集
p_max = np.full((p, p), float('Inf')) # 最大 p 值
p_max = Matrix(p_max)
p_max.diag(1)
# 主循环变量
done = False
order = 0 # 当前条件集大小
n_edge_tests = [0] * p # 边检验次数统计
# 主循环:逐步增加条件集大小
while not done and graph_matrix.any() and order <= m_max:
order_plus_one = order + 1
n_edge_tests[order_plus_one] = 0
done = True
# 获取当前所有边的索引
edge_indices = graph_matrix.which(1)
num_edges = graph_matrix.shape[1]
# 遍历所有边
for i in range(num_edges):
if i >= len(edge_indices):
break
x = edge_indices[i, 0]
y = edge_indices[i, 1]
# 检查边是否存在且不是固定边
if graph_matrix.M[y, x] and not fixed_edges[y, x]:
# 获取 x 的邻居(不包括 y)
neighbors_bool = graph_matrix.M[:, x].copy()
neighbors_bool[y] = 0
neighbors = [
idx for idx in seq_p
if seq_p[idx] and neighbors_bool[idx]
]
num_neighbors = len(neighbors)
# 只有邻居数量足够才能形成条件集
if num_neighbors >= order:
if num_neighbors > order:
done = False # 还需要继续更高阶的检验
# 生成初始条件集
condition_set_indices = list(range(order + 1))
if len(condition_set_indices) == 0:
return graph_matrix
# 遍历所有大小为 order 的条件集
while True:
n_edge_tests[order_plus_one] += 1
# 执行独立性检验
try:
actual_condition_set = [
neighbors[s] for s in condition_set_indices
if s < len(neighbors)
]
p_value = indep_test(x, y, actual_condition_set, suff_stat)
except Exception as e:
print(f"检验出错: S={condition_set_indices}, neighbors={neighbors}, error={e}")
p_value = None
# 处理缺失 p 值
if p_value is None:
p_value = int(na_delete)
# 更新最大 p 值
if p_max.M[x, y] < p_value:
p_max.M[x, y] = p_value
# 如果 p 值大于阈值,删除边并记录分离集
if p_value >= alpha:
graph_matrix.M[x, y] = graph_matrix.M[y, x] = 0
try:
sepset[x][y] = [neighbors[s] for s in condition_set_indices]
except Exception:
return graph_matrix
break
# 获取下一个条件集
next_set_result = get_next_set(num_neighbors, order, condition_set_indices)
if next_set_result is None or next_set_result['waslast']:
break
condition_set_indices = next_set_result['set']
order += 1
# 对称化最大 p 值矩阵
for i in range(1, p):
for j in range(i + 1, p):
p_max.M[i, j] = p_max.M[j, i] = max(p_max.M[i, j], p_max.M[j, i])
return graph_matrix
# 保持向后兼容的别名
skeletion = skeleton
def pc(
suff_stat: List,
indep_test: Callable,
alpha: float,
labels: List,
p: int,
fixed_gaps: Optional[np.ndarray] = None,
fixed_edges: Optional[np.ndarray] = None,
na_delete: bool = True,
m_max: float = float('Inf'),
u2pd: Tuple[str, ...] = ("relaxed", "rand", "retry"),
skel_method: Tuple[str, ...] = ('stable', 'original'),
solve_confl: bool = False,
num_cores: int = 1,
verbose: bool = False
) -> Matrix:
"""
PC 算法完整实现
包括骨架发现和边方向确定两个阶段。
Args:
suff_stat: 充分统计量
indep_test: 独立性检验函数
alpha: 显著性水平
labels: 变量标签
p: 变量数量
fixed_gaps: 固定间隙矩阵
fixed_edges: 固定边矩阵
na_delete: 缺失值处理
m_max: 最大条件集大小
u2pd: 无向边处理策略
skel_method: 骨架发现方法
solve_confl: 是否解决冲突
num_cores: 并行核心数
verbose: 详细输出
Returns:
Matrix: 最终的因果图邻接矩阵
Note:
当前版本仅实现了骨架发现,边方向确定待实现。
"""
try:
_ = labels
_ = p
except Exception:
raise Exception("必须指定 'labels' 和 'p' 参数!")
# TODO: 实现完整的 PC 算法,包括方向确定
# 当前仅返回骨架
return skeleton(
suff_stat=suff_stat,
indep_test=indep_test,
alpha=alpha,
labels=labels,
fixed_gaps=fixed_gaps,
fixed_edges=fixed_edges,
na_delete=na_delete,
m_max=m_max,
u2pd=u2pd,
solve_confl=solve_confl,
num_cores=num_cores,
verbose=verbose
)
def demo_pc_algorithm():
"""
PC 算法演示函数
使用测试数据演示 PC 算法的完整流程,包括:
1. 数据加载和预处理
2. 计算相关矩阵和偏相关系数
3. 执行骨架发现算法
4. 可视化结果
Example:
>>> demo_pc_algorithm()
"""
print("=" * 60)
print("PC 算法演示")
print("=" * 60)
# 1. 加载测试数据
print("\n[1] 加载测试数据...")
data_df = pd.read_csv('./test_data.csv')
data = np.array(data_df.iloc[:, :])[:, 1:] # 跳过索引列
# 2. 计算相关矩阵
print("[2] 计算相关矩阵...")
corr_matrix = pd.DataFrame(data).corr()
# 3. 设置变量标签
# 中英文对照: space, middle(中), Sports(体育讯), min(分), sina(新浪),
# match(比赛), player(球员), team(球队), day(日), month(月),
# peking(北京), time(时间)
labels = [
'space', 'middle', 'Sports', 'min', 'sina',
'match', 'player', 'team', 'day', 'month',
'peking', 'time'
]
# 验证数据维度
assert data.shape[1] == len(labels), \
f"数据列数 ({data.shape[1]}) 与标签数 ({len(labels)}) 不匹配"
# 创建标签字典
label_dict = dict(zip(range(data.shape[1]), labels))
print(f" 变量数量: {len(labels)}")
print(f" 样本数量: {data.shape[0]}")
# 4. 演示基础统计计算
print("\n[3] 演示基础统计计算...")
# 伪逆计算
rev = pseudoinverse(corr_matrix)
print(f" 伪逆矩阵形状: {rev.shape}")
# 偏相关系数
pcor = partial_correlation(1, 2, 3, corr_matrix)
print(f" 偏相关系数 r(1,2|3) = {pcor:.4f}")
# Z 统计量
zs = z_statistic(1, 2, 3, corr_matrix, corr_matrix.shape[0])
print(f" Z 统计量 = {zs:.4f}")
# 独立性检验
p_val = independence_test(1, 2, 3, [corr_matrix, corr_matrix.shape[0]])
print(f" p 值 = {p_val:.4f}")
# 5. 演示组合生成器
print("\n[4] 演示组合生成器...")
next_set = get_next_set(5, 2, [1, 2])
print(f" getNextSet(5, 2, [1,2]) = {next_set}")
print(f" 参考结果: {{'set': [1, 3], 'waslast': False}}")
next_set = get_next_set(5, 2, [4, 5])
print(f" getNextSet(5, 2, [4,5]) = {next_set}")
print(f" 参考结果: {{'set': [4, 5], 'waslast': True}}")
# 6. 运行骨架发现算法
print("\n[5] 运行骨架发现算法...")
print(" 显著性水平 alpha = 0.05")
graph = skeleton(
suff_stat=[corr_matrix, corr_matrix.shape[0]],
indep_test=independence_test,
alpha=0.05,
labels=list(range(corr_matrix.shape[0])),
fixed_gaps=None,
fixed_edges=None,
na_delete=True,
m_max=float('Inf'),
u2pd=("relaxed", "rand", "retry"),
solve_confl=False,
num_cores=1,
verbose=False
)
print(f"\n[6] 结果邻接矩阵:")
print(f" 形状: {graph.shape}")
print(f" 边数: {int(np.sum(graph.M) / 2)}")
# 7. 可视化
print("\n[7] 可视化因果图...")
visualize_graph(graph, label_dict)
print("\n" + "=" * 60)
print("演示完成!")
print("=" * 60)
# 保持向后兼容
debug_trivial = demo_pc_algorithm
if __name__ == "__main__":
demo_pc_algorithm()