From 7e8192a9bc929b32318795299e0bb882bbb0e6a0 Mon Sep 17 00:00:00 2001 From: Arno Bock Date: Mon, 26 Jan 2026 21:04:14 +0100 Subject: [PATCH 1/6] [SYSTEMDS-3928] added disjoint subnet masking based on FC layers --- scripts/builtin/independentSubnetTrain.dml | 462 ++++++++++++++++++ .../org/apache/sysds/common/Builtins.java | 1 + .../builtin/indSubnetTest_mnist_lenet.dml | 186 +++++++ 3 files changed, 649 insertions(+) create mode 100644 scripts/builtin/independentSubnetTrain.dml create mode 100644 src/test/scripts/functions/builtin/indSubnetTest_mnist_lenet.dml diff --git a/scripts/builtin/independentSubnetTrain.dml b/scripts/builtin/independentSubnetTrain.dml new file mode 100644 index 00000000000..a77d3a98e42 --- /dev/null +++ b/scripts/builtin/independentSubnetTrain.dml @@ -0,0 +1,462 @@ +m_independentSubnetTrain = function( + list[unknown] model, + matrix[double] features, + matrix[double] labels, + matrix[double] val_features, + matrix[double] val_labels, + string upd, + string agg, + string mode, + string utype, + int epochs, + int batchsize, + int j, + int k, + string scheme, + list[unknown] hyperparams, + boolean verbose, + int paramsPerLayer, + list[int] fullyConnectedLayers +) +return (list[unknown] model_out_2) +{ + # ------------------------------------------------------------ + # Setup + # ------------------------------------------------------------ + print("Entered IST function.") + model_out = model + + P = length(model) + print("Parameters in model:") + print(P) + + N = nrow(features) + print("Samples:") + print(N) + + print("Is model length NOT divisible by paramsPerLayer? :") + print(P %% paramsPerLayer != 0) + if (P %% paramsPerLayer != 0) { + stop("Model length not divisible by paramsPerLayer") + } + + print("We made it") + L = as.integer(P / paramsPerLayer) # total layers + print("Layers:") + print(L) + + # obtain indices of FC layers + fcLayers = fullyConnectedLayers + print("FC layers:") + print(toString(fcLayers)) + + # I. determine shared parameters + isSharedParam = matrix(0, 1, P) + + # create mask for all parameters of FC layers + isFC = matrix(0, rows=1, cols=L) + for (i in 1:length(fcLayers)) { + idx = as.integer(as.scalar(fcLayers[i])) + isFC[1, idx] = 1 + } + print(toString(isFC)) + + isFC_rep = isFC + for (r in 2:paramsPerLayer) { + isFC_rep = cbind(isFC_rep, isFC) + } + print(toString(isFC_rep)) + if (ncol(isFC_rep)!=P) stop("Dimension mismatch for FC layer mask.") + + + # 1. all non-FC layers are shared + isSharedParam = 1 - isFC_rep + print(toString(isSharedParam)) + + + # 2. FC bias parameters are shared in: output layer or at the end of a FC block + for (paramId in seq(2, paramsPerLayer, 2)) { # iterate bias blocks only + for (l in 1:L) { + if (as.scalar(isFC[1,l])==1 & l==L) { + p_out_bias = (paramId - 1) * L + L # output bias is shared across subnets + isSharedParam[1, p_out_bias] = 1 + } + else if (as.scalar(isFC[1,l])==1 & l gradients collide; must be handled by aggregation logic +# ------------------------------------------------------------ + +ist_create_disjoint_masks = function( + list[unknown] model, + int numSubnets, + int L, # total layers including output layer + list[int] fullyConnectedLayers, # the indices of FC-layers starting from 1 + int paramsPerFCLayer, + Matrix[Double] isFC) + return (list[unknown] masks) +{ + P = length(model) + modus = 0 # {0: matrix / 1: list} + + # SANITY CHECKS: ensure provided model can be masked correctly + if (as.integer(P / paramsPerFCLayer) != L) { + stop("Layer/parameter mismatch. Please make sure each layer has the same amount of parameters.") + }; + if (paramsPerFCLayer < 2 | paramsPerFCLayer %% 2 != 0) { + stop("At least 1 pair of W and b needs to be present, as well as parameters need to be W&b pairs.") + } + + # I.) initialize and preallocate masks + masks = list() + for (s in 1:numSubnets) { + masks = append(masks, model); + #for (k in 1:P) { TODO might be a problem? + # masks[s][k] = matrix(0, rows=nrow(model[p]), cols=ncol(model[p])) + #} + } + print("Masks now has following length:") + print(length(masks)) + + + # II.) determine FC layers + print("Forwarded fully connected layer information:") + print(toString(isFC)) + + # TODO NEW START - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + masks_new_meta = matrix(0, rows=length(model), cols=4) # columns=[start,end,rows,cols] + current_position = 1 + for (p in 1:length(model)) { + M = as.matrix(model[p]) + param_length = ncol(M) * nrow(M) # as.scalar(ncol(M)) * as.scalar(nrow(M)) + + masks_new_meta[p,1] = current_position + masks_new_meta[p,2] = current_position + param_length -1 + masks_new_meta[p,3] = nrow(M) + masks_new_meta[p,4] = ncol(M) + + current_position = current_position + param_length + } + mask_size = current_position-1 + + # All subnets in one matrix + masks_new = matrix(0, rows=numSubnets, cols=mask_size) + + + print(masks_new_meta) + + # TODO NEW END - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + # III.) iterate all layers + for (l in 1:L) { + if (as.scalar(isFC[1,l]) == 1) { + print("Entered fully connected layer. The layer is:") + print(l) + + W = as.matrix(model[l]) + print("W (rows/cols):") + print(nrow(W)) + print(ncol(W)) + + b = as.matrix(model[l+L]) + print("b:") + print(ncol(b)) + + H = ncol(W); # bias neurons in layer l + print("H:") + print(H) + + # SANITY CHECK + if (nrow(b) != 1 | ncol(b) != H) { + print("Bias shape mismatch!") + print("b:", nrow(b), "x", ncol(b)) + print("expected: 1 x", H) + stop("Invalid bias shape") + } + if (l!=L & numSubnets>ncol(b)) { # TODO change to next layer is non-FC logic + print("More subnets than available neurons in layer:") + print(l) + stop("Please use a wider model or decrease the amount of subnets.") + } + + randyOrton2 = rand(rows=H, cols=1) # shuffle all indices + allNeuronIndicesRandom = order(target=randyOrton2, by=1, decreasing=FALSE, index.return=TRUE) # shuffle all indices + print("Length of random sample indices:") + print(length(allNeuronIndicesRandom)) + + # TODO FROM HERE ... + chunk_size = floor(H/numSubnets) # amount of neurons each subnet will consist at least TODO for l=L they are shared s this value will be quite low or even below 1 + remaining_neurons = H - chunk_size * numSubnets + print("Dividing all hidden layer neurons by the number of subnets, each subnet will own at least:") + print(chunk_size) + print("Following amount of neurons remains and will be randomly assigned to the subnets (at most one to a subnet):") + print(remaining_neurons) + + amount_active_neurons = matrix(chunk_size, rows=numSubnets, cols=1) + if (remaining_neurons > 0) { + randomSubnetIndices = order(target=rand(rows=numSubnets, cols=1, seed=-1), by=1, decreasing=FALSE, index.return=TRUE) # TODO replace seed for experiments + for (i in 1:remaining_neurons) { + sid = as.integer(as.scalar(randomSubnetIndices[i,1])) + amount_active_neurons[sid,1] = as.scalar(amount_active_neurons[sid,1]) + 1 # TODO use pmin() + } + } + print("Amount of active neurons per subnet:") + print(amount_active_neurons) + + neuron_end_indices = cumsum(amount_active_neurons) + neuron_start_indices = neuron_end_indices - amount_active_neurons + 1 + print("Start indices of each subnet:") + print(neuron_start_indices) + + for(s in 1:numSubnets) { + print("Entered subnet:") + print(s) + + # A. obtain owned neurons for this layer + start = as.integer(as.scalar(neuron_start_indices[s,1])) + end = as.integer(as.scalar(neuron_end_indices[s,1])) + current_b_indices = allNeuronIndicesRandom[start:end, 1] + print(length(current_b_indices)) + # TODO ... UNTIL HERE: only required for else case (in bias case) + + # B. create masked bias + if(l==L) { # output layer + masked_b = matrix(1, rows=1, cols=ncol(b)) + } + else if (l1 & as.scalar(isFC[1, l-1])==0) { # previous layer is not FC TODO works with || operator? + for (i in 1:nrow(current_b_indices)) { # TODO vectorized possible? + idx = as.integer(as.scalar(current_b_indices[i,1])) + masked_W[1:nrow(W), idx] = matrix(1, rows=nrow(W), cols=1) + } + } + else { + # obtain active neurons of previous layer + p = L + (l-1) # TODO investigate + + if (modus==1) { + previous_masked_b_list = as.list(masks[s]) + previous_masked_b = as.matrix(previous_masked_b_list[p]) + print(toString(previous_masked_b)) + } else { + start = as.integer(as.scalar(masks_new_meta[p,1])) + end = as.integer(as.scalar(masks_new_meta[p,2])) + r = as.integer(as.scalar(masks_new_meta[p,3])) + c = as.integer(as.scalar(masks_new_meta[p,4])) + print("Following info was obtained.") + print(start) + print(end) + print(r) + print(c) + + vec = masks_new[s, start:end] + previous_masked_b = matrix(vec, rows=r, cols=c, byrow=TRUE) # TODO might be unnecessary + print("Previous bias:") + print(previous_masked_b) + } + + # SANITY CHECK: dimensions with layers of previous layer match + if (l > 1 & ncol(previous_masked_b) != nrow(W)) { + print("W/prev layer mismatch in layer l=", l) + print("prev_b:", nrow(previous_masked_b), "x", ncol(previous_masked_b)) + print("W:", nrow(W), "x", ncol(W)) + stop("Invalid W shape wrt previous layer") + } + + if (nrow(previous_masked_b)==1) previous_masked_b = t(previous_masked_b) + if (ncol(masked_b) == 1) masked_b = t(masked_b) + + if(l==L) { # output layer + masked_W = previous_masked_b %*% matrix(1, 1, ncol(masked_W)) + } + else if (l