-
Notifications
You must be signed in to change notification settings - Fork 85
Expand file tree
/
Copy pathbalanced_allocation.py
More file actions
361 lines (308 loc) · 12.4 KB
/
balanced_allocation.py
File metadata and controls
361 lines (308 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""Balanced service area partitioning.
Partitions a cost surface into territories of roughly equal cost-weighted
area. Each cell is assigned to the source whose biased cost-distance is
lowest, where biases are iteratively adjusted so that no source's territory
is disproportionately large or small.
Algorithm
---------
1. Run ``cost_distance`` once per source to get N cost surfaces.
2. Assign each cell to ``argmin(cost[i] + bias[i])``.
3. Compute each territory's cost-weighted area (sum of friction values).
4. Adjust biases proportionally: increase for over-large territories,
decrease for under-served ones.
5. Repeat 2-4 until convergence or ``max_iterations``.
"""
from __future__ import annotations
import numpy as np
import xarray as xr
from xrspatial.cost_distance import cost_distance
from xrspatial.utils import _validate_raster
try:
import dask.array as da
except ImportError:
da = None
try:
import cupy
except ImportError:
class cupy: # type: ignore[no-redef]
ndarray = False
def _to_numpy(arr):
"""Convert any array (numpy, cupy, dask) to a numpy array."""
if da is not None and isinstance(arr, da.Array):
arr = arr.compute()
if hasattr(arr, 'get'): # cupy
return arr.get()
return np.asarray(arr)
def _as_numpy(arr):
"""Convert a computed array (numpy or cupy) to numpy."""
if hasattr(arr, 'get'):
return arr.get()
return np.asarray(arr)
def _extract_sources(raster, target_values):
"""Return sorted array of unique source IDs from the raster."""
data = _to_numpy(raster.data)
if len(target_values) > 0:
ids = np.asarray(target_values, dtype=np.float64)
else:
mask = np.isfinite(data) & (data != 0)
ids = np.unique(data[mask])
return ids[np.isfinite(ids)]
def _make_single_source_raster(raster, source_id):
"""Create a raster with only one source value, rest zero."""
data = raster.data
# Build mask for this source
if da is not None and isinstance(data, da.Array):
single = da.where(data == source_id, source_id, 0.0)
elif hasattr(data, 'get'): # cupy
import cupy as cp
single = cp.where(data == source_id, source_id, 0.0)
else:
single = np.where(data == source_id, source_id, 0.0)
return xr.DataArray(
single,
coords=raster.coords,
dims=raster.dims,
attrs=raster.attrs,
)
def _sum_where(friction_data, alloc_data, source_id):
"""Sum friction values where allocation matches source_id.
Returns a Python float. Works with numpy, cupy, and dask arrays.
"""
if da is not None and isinstance(friction_data, da.Array):
masked = da.where(alloc_data == source_id, friction_data, 0.0)
return float(masked.sum().compute())
elif hasattr(friction_data, 'get'): # cupy
import cupy as cp
masked = cp.where(alloc_data == source_id, friction_data, 0.0)
return float(cp.asnumpy(masked.sum()))
else:
masked = np.where(alloc_data == source_id, friction_data, 0.0)
return float(masked.sum())
def _allocate_from_costs(cost_stack, source_ids, fill_value=np.nan):
"""Assign each cell to the source with lowest cost.
Parameters
----------
cost_stack : list of arrays
Per-source cost-distance arrays (same shape).
source_ids : 1-D numpy array
Source ID for each layer in cost_stack.
fill_value : float
Value for cells unreachable from any source.
Returns
-------
allocation : array
2-D array of source IDs.
"""
# Stack along axis 0 -> shape (N, H, W)
first = cost_stack[0]
if da is not None and isinstance(first, da.Array):
stacked = da.stack(cost_stack, axis=0)
# Replace NaN with inf for argmin
stacked_clean = da.where(da.isnan(stacked), np.inf, stacked)
best_idx = da.argmin(stacked_clean, axis=0).compute()
best_idx = _as_numpy(best_idx)
elif hasattr(first, 'get'): # cupy
import cupy as cp
stacked = cp.stack(cost_stack, axis=0)
stacked_clean = cp.where(cp.isnan(stacked), cp.inf, stacked)
best_idx = cp.asnumpy(cp.argmin(stacked_clean, axis=0))
else:
stacked = np.stack(cost_stack, axis=0)
stacked_clean = np.where(np.isnan(stacked), np.inf, stacked)
best_idx = np.argmin(stacked_clean, axis=0)
# Map index back to source ID
alloc = source_ids[best_idx]
# Mark cells that are unreachable from all sources
if da is not None and isinstance(first, da.Array):
all_nan = da.all(da.isnan(da.stack(cost_stack, axis=0)), axis=0)
all_nan = _as_numpy(all_nan.compute())
elif hasattr(first, 'get'):
import cupy as cp
all_nan = cp.asnumpy(
cp.all(cp.isnan(cp.stack(cost_stack, axis=0)), axis=0)
)
else:
all_nan = np.all(np.isnan(np.stack(cost_stack, axis=0)), axis=0)
alloc = alloc.astype(np.float32)
alloc[all_nan] = fill_value
return alloc
def _allocate_biased(cost_stack, biases, source_ids, fill_value=np.nan):
"""Assign each cell to source with lowest (cost + bias).
Like _allocate_from_costs but adds per-source bias before argmin.
"""
first = cost_stack[0]
n = len(cost_stack)
if da is not None and isinstance(first, da.Array):
layers = []
for i in range(n):
layer = da.where(da.isnan(cost_stack[i]), np.inf,
cost_stack[i] + biases[i])
layers.append(layer)
stacked = da.stack(layers, axis=0)
best_idx = da.argmin(stacked, axis=0).compute()
best_idx = _as_numpy(best_idx)
elif hasattr(first, 'get'):
import cupy as cp
layers = []
for i in range(n):
layer = cp.where(cp.isnan(cost_stack[i]), cp.inf,
cost_stack[i] + biases[i])
layers.append(layer)
stacked = cp.stack(layers, axis=0)
best_idx = cp.asnumpy(cp.argmin(stacked, axis=0))
else:
layers = []
for i in range(n):
layer = np.where(np.isnan(cost_stack[i]), np.inf,
cost_stack[i] + biases[i])
layers.append(layer)
stacked = np.stack(layers, axis=0)
best_idx = np.argmin(stacked, axis=0)
alloc = source_ids[best_idx].astype(np.float32)
# Mark unreachable cells
if da is not None and isinstance(first, da.Array):
all_nan = da.all(da.isnan(da.stack(cost_stack, axis=0)), axis=0)
all_nan = _as_numpy(all_nan.compute())
elif hasattr(first, 'get'):
import cupy as cp
all_nan = cp.asnumpy(
cp.all(cp.isnan(cp.stack(cost_stack, axis=0)), axis=0)
)
else:
all_nan = np.all(np.isnan(np.stack(cost_stack, axis=0)), axis=0)
alloc[all_nan] = fill_value
return alloc
def balanced_allocation(
raster: xr.DataArray,
friction: xr.DataArray,
x: str = "x",
y: str = "y",
target_values: list = [],
max_cost: float = np.inf,
connectivity: int = 8,
tolerance: float = 0.05,
max_iterations: int = 100,
learning_rate: float = 0.5,
) -> xr.DataArray:
"""Partition a cost surface into balanced service territories.
Assigns each cell to a source such that all territories have roughly
equal cost-weighted area (sum of friction values). This extends
standard cost-distance allocation by iteratively adjusting per-source
biases until the workload is balanced.
Parameters
----------
raster : xr.DataArray
2-D source raster. Source pixels are identified by non-zero
finite values (or values in *target_values*). Each unique value
is treated as a separate source.
friction : xr.DataArray
2-D friction (cost) surface. Must have the same shape and
coordinates as *raster*.
x : str, default='x'
Name of the x coordinate.
y : str, default='y'
Name of the y coordinate.
target_values : list, optional
Specific pixel values in *raster* to treat as sources.
If empty, all non-zero finite pixels are sources.
max_cost : float, default=np.inf
Maximum accumulated cost passed to ``cost_distance``.
connectivity : int, default=8
Pixel connectivity (4 or 8) passed to ``cost_distance``.
tolerance : float, default=0.05
Convergence threshold. The loop stops when every territory's
cost-weighted area is within ``tolerance`` of the mean (as a
fraction of the mean).
max_iterations : int, default=100
Maximum number of bias-adjustment iterations.
learning_rate : float, default=0.5
Controls how aggressively biases are updated each iteration.
Smaller values are more stable; larger values converge faster.
Returns
-------
xr.DataArray
2-D array of source IDs (float32). Each cell contains the ID
of the source it is assigned to. Unreachable cells are NaN.
"""
_validate_raster(raster, func_name='balanced_allocation', name='raster')
_validate_raster(friction, func_name='balanced_allocation',
name='friction')
if raster.shape != friction.shape:
raise ValueError("raster and friction must have the same shape")
if raster.dims != (y, x):
raise ValueError(
f"raster.dims should be ({y!r}, {x!r}), got {raster.dims}"
)
if connectivity not in (4, 8):
raise ValueError("connectivity must be 4 or 8")
if tolerance <= 0:
raise ValueError("tolerance must be positive")
if max_iterations < 1:
raise ValueError("max_iterations must be >= 1")
source_ids = _extract_sources(raster, target_values)
n_sources = len(source_ids)
if n_sources == 0:
out = np.full(raster.shape, np.nan, dtype=np.float32)
return xr.DataArray(out, coords=raster.coords, dims=raster.dims,
attrs=raster.attrs)
if n_sources == 1:
# Only one source: every reachable cell goes to it
cd = cost_distance(raster, friction, x=x, y=y,
target_values=list(target_values),
max_cost=max_cost, connectivity=connectivity)
cd_np = _to_numpy(cd.data)
out = np.where(np.isfinite(cd_np), source_ids[0], np.nan)
return xr.DataArray(out.astype(np.float32), coords=raster.coords,
dims=raster.dims, attrs=raster.attrs)
# Step 1: compute per-source cost-distance surfaces
cost_surfaces = [] # list of raw data arrays (numpy/cupy/dask)
for sid in source_ids:
single = _make_single_source_raster(raster, sid)
cd = cost_distance(single, friction, x=x, y=y,
max_cost=max_cost, connectivity=connectivity)
cost_surfaces.append(cd.data)
# Step 2: get friction data for weighting
fric_data = friction.data
if da is not None and isinstance(fric_data, da.Array):
fric_np = fric_data.compute()
if hasattr(fric_np, 'get'):
fric_np = fric_np.get()
elif hasattr(fric_data, 'get'):
fric_np = fric_data.get()
else:
fric_np = np.asarray(fric_data)
# Replace non-positive and NaN friction with 0 for weighting
fric_weight = np.where(np.isfinite(fric_np) & (fric_np > 0), fric_np, 0.0)
# Step 3: iterative balancing
biases = np.zeros(n_sources, dtype=np.float64)
for iteration in range(max_iterations):
# Allocate with current biases
alloc = _allocate_biased(cost_surfaces, biases, source_ids)
# Compute per-territory cost-weighted area
weights = np.array([
float(np.sum(fric_weight[alloc == sid]))
for sid in source_ids
])
# Handle sources with no reachable cells
total = weights.sum()
if total == 0:
break
target_weight = total / n_sources
# Check convergence: max relative deviation
nonzero = weights > 0
if np.any(nonzero):
max_dev = np.max(np.abs(weights - target_weight)) / target_weight
if max_dev <= tolerance:
break
# Update biases proportionally
for i in range(n_sources):
if target_weight > 0:
deviation = (weights[i] - target_weight) / target_weight
biases[i] += learning_rate * deviation
# Build output DataArray
return xr.DataArray(
alloc.astype(np.float32),
coords=raster.coords,
dims=raster.dims,
attrs=raster.attrs,
)