-
Notifications
You must be signed in to change notification settings - Fork 528
Expand file tree
/
Copy pathOptimizerUtils.java
More file actions
1738 lines (1522 loc) · 61.2 KB
/
OptimizerUtils.java
File metadata and controls
1738 lines (1522 loc) · 61.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.hops;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.common.Types.ExecType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.OpOp1;
import org.apache.sysds.common.Types.OpOp2;
import org.apache.sysds.common.Types.OpOp3;
import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.common.Types.ReOrgOp;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.CompilerConfig;
import org.apache.sysds.conf.CompilerConfig.ConfigType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.fedplanner.FTypes.FederatedPlanner;
import org.apache.sysds.hops.rewrite.HopRewriteUtils;
import org.apache.sysds.lops.Checkpoint;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.lops.compile.Dag;
import org.apache.sysds.parser.ForStatementBlock;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
import org.apache.sysds.runtime.controlprogram.caching.UnifiedMemoryManager;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseBlock.Type;
import org.apache.sysds.runtime.functionobjects.IntegerDivide;
import org.apache.sysds.runtime.functionobjects.Modulus;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.MemoryEstimates;
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
public class OptimizerUtils
{
////////////////////////////////////////////////////////
// Optimizer constants and flags (incl tuning knobs) //
////////////////////////////////////////////////////////
/**
* Utilization factor used in deciding whether an operation to be scheduled on CP or MR.
* NOTE: it is important that MEM_UTIL_FACTOR+CacheableData.CACHING_BUFFER_SIZE < 1.0
*/
public static double MEM_UTIL_FACTOR = 0.7d;
/** Default buffer pool sizes for static (15%) and unified (85%) memory */
public static double DEFAULT_MEM_UTIL_FACTOR = 0.15d;
public static double DEFAULT_UMM_UTIL_FACTOR = 0.85d;
/** Memory managers (static partitioned, unified) */
public enum MemoryManager {
STATIC_MEMORY_MANAGER,
UNIFIED_MEMORY_MANAGER
}
/** Indicate the current memory manager in effect */
public static MemoryManager MEMORY_MANAGER = null;
/** Buffer pool size in bytes */
public static long BUFFER_POOL_SIZE = 0;
/** Default blocksize if unspecified or for testing purposes */
public static final int DEFAULT_BLOCKSIZE = 1000;
/** Default frame blocksize */
public static final int DEFAULT_FRAME_BLOCKSIZE = 1000;
/** Default optimization level if unspecified */
public static final OptimizationLevel DEFAULT_OPTLEVEL =
OptimizationLevel.O2_LOCAL_MEMORY_DEFAULT;
/**
* Default memory size, which is used if the actual estimate can not be computed
* e.g., when input/output dimensions are unknown. The default is set to a large
* value so that operations are scheduled on MR while avoiding overflows as well.
*/
public static double DEFAULT_SIZE;
public static final long DOUBLE_SIZE = 8;
public static final long INT_SIZE = 4;
public static final long CHAR_SIZE = 1;
public static final long BOOLEAN_SIZE = 1;
public static final double INVALID_SIZE = -1d; // memory estimate not computed
//constants for valid CP matrix dimension sizes / nnz (dense/sparse)
public static final long MAX_NUMCELLS_CP_DENSE = Integer.MAX_VALUE;
public static final long MAX_NNZ_CP_SPARSE = (MatrixBlock.DEFAULT_SPARSEBLOCK ==
SparseBlock.Type.MCSR) ? Long.MAX_VALUE : Integer.MAX_VALUE;
public static final long SAFE_REP_CHANGE_THRES = 8 * 1024 *1024; //8MB
/**
* Enables common subexpression elimination in dags. There is however, a potential tradeoff
* between computation redundancy and data transfer between MR jobs. Since, we do not reason
* about transferred data yet, this rewrite rule is enabled by default.
*/
public static boolean ALLOW_COMMON_SUBEXPRESSION_ELIMINATION = true;
/**
* Enables constant folding in dags. Constant folding computes simple expressions of binary
* operations and literals and replaces the hop sub-DAG with a new literal operator.
*/
public static boolean ALLOW_CONSTANT_FOLDING = true;
public static boolean ALLOW_ALGEBRAIC_SIMPLIFICATION = true;
public static boolean ALLOW_OPERATOR_FUSION = true;
/**
* Enables if-else branch removal for constant predicates (original literals or
* results of constant folding).
*
*/
public static boolean ALLOW_BRANCH_REMOVAL = true;
/**
* Enables the removal of (par)for-loops when from, to, and increment are constants
* (original literals or results of constant folding) and lead to an empty sequence,
* i.e., (par)for-loops without a single iteration.
*/
public static boolean ALLOW_FOR_LOOP_REMOVAL = true;
public static boolean ALLOW_AUTO_VECTORIZATION = true;
/**
* Enables simple expression evaluation for datagen parameters 'rows', 'cols'. Simple
* expressions are defined as binary operations on literals and nrow/ncol. This applies
* only to exact size information.
*/
public static boolean ALLOW_SIZE_EXPRESSION_EVALUATION = true;
/**
* Enables simple expression evaluation for datagen parameters 'rows', 'cols'. Simple
* expressions are defined as binary operations on literals and b(+) or b(*) on nrow/ncol.
* This applies also to worst-case size information.
*/
public static boolean ALLOW_WORSTCASE_SIZE_EXPRESSION_EVALUATION = true;
public static boolean ALLOW_RAND_JOB_RECOMPILE = true;
/**
* Enables parfor runtime piggybacking of MR jobs into the packed jobs for
* scan sharing.
*/
public static boolean ALLOW_RUNTIME_PIGGYBACKING = true;
/**
* Enables interprocedural analysis between main script and functions as well as functions
* and other functions. This includes, for example, to propagate statistics into functions
* if save to do so (e.g., if called once).
*/
public static boolean ALLOW_INTER_PROCEDURAL_ANALYSIS = true;
/**
* Number of inter-procedural analysis (IPA) repetitions. If set to {@literal >=2}, we apply
* IPA multiple times in order to allow scalar propagation over complex function call
* graphs and various interactions between constant propagation, constant folding,
* and other rewrites such as branch removal and the merge of statement block sequences.
*/
public static int IPA_NUM_REPETITIONS = 5;
/**
* Enables sum product rewrites such as mapmultchains. In the future, this will cover
* all sum-product related rewrites.
*/
public static boolean ALLOW_SUM_PRODUCT_REWRITES = true;
public static boolean ALLOW_SUM_PRODUCT_REWRITES2 = true;
/**
* Enables additional mmchain optimizations. In the future, this might be merged with
* ALLOW_SUM_PRODUCT_REWRITES.
*/
public static boolean ALLOW_ADVANCED_MMCHAIN_REWRITES = false;
/**
* Enables a specific hop dag rewrite that splits hop dags after csv persistent reads with
* unknown size in order to allow for recompile.
*/
public static boolean ALLOW_SPLIT_HOP_DAGS = true;
/**
* Enables a specific rewrite that enables update in place for loop variables that are
* only read/updated via cp leftindexing.
*/
public static boolean ALLOW_LOOP_UPDATE_IN_PLACE = true;
/**
* Enables the update-in-place for all unary operators with a single
* consumer. In this case we do not allocate the output, but directly
* write the output values back to the input block.
*/
//TODO enabling it by default requires modifications in lineage-based reuse
public static boolean ALLOW_UNARY_UPDATE_IN_PLACE = false;
/**
* Enables update-in-place for binary operators if the first input
* has no consumers. In this case we directly write the output
* values back to the first input block.
*/
public static boolean ALLOW_BINARY_UPDATE_IN_PLACE = false;
/**
* Replace eval second-order function calls with normal function call
* if the function name is a known string (after constant propagation).
*/
public static boolean ALLOW_EVAL_FCALL_REPLACEMENT = true;
/**
* Enables a specific rewrite for code motion, i.e., hoisting loop invariant code
* out of while, for, and parfor loops.
*/
public static boolean ALLOW_CODE_MOTION = false;
/**
* Compile federated instructions based on input federation state and privacy constraints.
*/
public static boolean FEDERATED_COMPILATION = false;
public static Map<Integer, FEDInstruction.FederatedOutput> FEDERATED_SPECS = new HashMap<>();
/**
* Specifies a multiplier computing the degree of parallelism of parallel
* text read/write out of the available degree of parallelism. Set it to 1.0
* to get a number of threads equal the number of virtual cores.
*
*/
public static final double PARALLEL_CP_READ_PARALLELISM_MULTIPLIER = 1.0;
public static final double PARALLEL_CP_WRITE_PARALLELISM_MULTIPLIER = 1.0;
/**
* Enables the use of CombineSequenceFileInputFormat with splitsize = 2x hdfs blocksize,
* if sort buffer size large enough and parallelism not hurt. This solves to issues:
* (1) it combines small files (depending on producers), and (2) it reduces task
* latency of large jobs with many tasks by factor 2.
*
*/
public static final boolean ALLOW_COMBINE_FILE_INPUT_FORMAT = true;
/**
* This variable allows for use of explicit local command, that forces a spark block to be executed and returned as a local block.
*/
public static boolean ALLOW_SCRIPT_LEVEL_LOCAL_COMMAND = false;
/**
* This variable allows for insertion of Compress and decompress in the dml script from the user.
* This is added because we want to have a way to test, and verify the correct placement of compress and decompress commands.
*/
public static boolean ALLOW_SCRIPT_LEVEL_COMPRESS_COMMAND = true;
/**
* This variable allows for insertion of Quantize and compress in the dml script from the user.
*/
public static boolean ALLOW_SCRIPT_LEVEL_QUANTIZE_COMPRESS_COMMAND = true;
/**
* Boolean specifying if quantization-fused compression rewrite is allowed.
*/
public static boolean ALLOW_QUANTIZE_COMPRESS_REWRITE = true;
/**
* Boolean specifying if compression rewrites is allowed. This is disabled at run time if the IPA for Workload aware compression
* is activated.
*/
public static boolean ALLOW_COMPRESSION_REWRITE = true;
/**
* Enable transitive spark execution type selection. This refines the exec-type selection logic of unary aggregates
* by pushing * the unary aggregates, whose inputs are created by spark instructions, to spark execution type as well.
*/
public static boolean ALLOW_TRANSITIVE_SPARK_EXEC_TYPE = true;
/**
* Enable prefetch and broadcast. Prefetch asynchronously calls acquireReadAndRelease() to trigger remote
* operations, which would otherwise make the next instruction wait till completion. Broadcast allows
* asynchronously transferring the data to all the nodes.
*/
public static boolean ASYNC_PREFETCH = false; //both Spark and GPU
public static boolean ASYNC_BROADCAST_SPARK = false;
public static boolean ASYNC_CHECKPOINT_SPARK = false;
/**
* Heuristic-based instruction ordering to maximize inter-operator PARALLELISM.
* Place the Spark operator chains first and trigger them to execute in parallel.
*/
public static boolean MAX_PARALLELIZE_ORDER = false;
/**
* Cost-based instruction ordering to minimize total execution time under
* the constraint of available memory.
*/
public static boolean COST_BASED_ORDERING = false;
/**
* Rule-based operator placement policy for GPU.
*/
public static boolean RULE_BASED_GPU_EXEC = false;
/**
* Automatic placement of GPU lineage cache eviction
*/
public static boolean AUTO_GPU_CACHE_EVICTION = true;
/**
* Boolean specifying if relational algebra rewrites are allowed (e.g. Selection Pushdowns).
*/
public static boolean ALLOW_RA_REWRITES = false;
//////////////////////
// Optimizer levels //
//////////////////////
/**
* Optimization Types for Compilation
*
* O0 STATIC - Decisions for scheduling operations on CP/MR are based on
* predefined set of rules, which check if the dimensions are below a
* fixed/static threshold (OLD Method of choosing between CP and MR).
* The optimization scope is LOCAL, i.e., per statement block.
* Advanced rewrites like constant folding, common subexpression elimination,
* or inter procedural analysis are NOT applied.
*
* O1 MEMORY_BASED - Every operation is scheduled on CP or MR, solely
* based on the amount of memory required to perform that operation.
* It does NOT take the execution time into account.
* The optimization scope is LOCAL, i.e., per statement block.
* Advanced rewrites like constant folding, common subexpression elimination,
* or inter procedural analysis are NOT applied.
*
* O2 MEMORY_BASED - Every operation is scheduled on CP or MR, solely
* based on the amount of memory required to perform that operation.
* It does NOT take the execution time into account.
* The optimization scope is LOCAL, i.e., per statement block.
* All advanced rewrites are applied. This is the default optimization
* level of SystemDS.
*
* O3 GLOBAL TIME_MEMORY_BASED - Operation scheduling on CP or MR as well as
* many other rewrites of data flow properties such as block size, partitioning,
* replication, vectorization, etc are done with the optimization objective of
* minimizing execution time under hard memory constraints per operation and
* execution context. The optimization scope if GLOBAL, i.e., program-wide.
* All advanced rewrites are applied. This optimization level requires more
* optimization time but has higher optimization potential.
*
* O4 DEBUG MODE - All optimizations, global and local, which interfere with
* breakpoints are NOT applied. This optimization level is REQUIRED for the
* compiler running in debug mode.
*/
public enum OptimizationLevel {
O0_LOCAL_STATIC,
O1_LOCAL_MEMORY_MIN,
O2_LOCAL_MEMORY_DEFAULT,
O3_LOCAL_RESOURCE_TIME_MEMORY,
O4_GLOBAL_TIME_MEMORY,
O5_DEBUG_MODE,
}
public static OptimizationLevel getOptLevel() {
int optlevel = ConfigurationManager.getCompilerConfig().getInt(ConfigType.OPT_LEVEL);
return OptimizationLevel.values()[optlevel];
}
public static boolean isMemoryBasedOptLevel() {
return (getOptLevel() != OptimizationLevel.O0_LOCAL_STATIC);
}
public static boolean isOptLevel( OptimizationLevel level ){
return (getOptLevel() == level);
}
public static CompilerConfig constructCompilerConfig( DMLConfig dmlconf ) {
return constructCompilerConfig(new CompilerConfig(), dmlconf);
}
public static CompilerConfig constructCompilerConfig( CompilerConfig cconf, DMLConfig dmlconf )
{
//each script sets its own block size, opt level etc
cconf.set(ConfigType.BLOCK_SIZE, dmlconf.getIntValue( DMLConfig.DEFAULT_BLOCK_SIZE ));
//handle optimization level
int optlevel = dmlconf.getIntValue(DMLConfig.OPTIMIZATION_LEVEL);
if( optlevel < 0 || optlevel > 7 )
throw new DMLRuntimeException("Error: invalid optimization level '"+optlevel+"' (valid values: 0-5).");
switch( optlevel )
{
// opt level 0: static dimensionality
case 0:
cconf.set(ConfigType.OPT_LEVEL, OptimizationLevel.O0_LOCAL_STATIC.ordinal());
ALLOW_CONSTANT_FOLDING = false;
ALLOW_COMMON_SUBEXPRESSION_ELIMINATION = false;
ALLOW_ALGEBRAIC_SIMPLIFICATION = false;
ALLOW_AUTO_VECTORIZATION = false;
ALLOW_INTER_PROCEDURAL_ANALYSIS = false;
IPA_NUM_REPETITIONS = 1;
ALLOW_BRANCH_REMOVAL = false;
ALLOW_FOR_LOOP_REMOVAL = false;
ALLOW_SUM_PRODUCT_REWRITES = false;
break;
// opt level 1: memory-based (no advanced rewrites)
case 1:
cconf.set(ConfigType.OPT_LEVEL, OptimizationLevel.O1_LOCAL_MEMORY_MIN.ordinal());
ALLOW_CONSTANT_FOLDING = false;
ALLOW_COMMON_SUBEXPRESSION_ELIMINATION = false;
ALLOW_ALGEBRAIC_SIMPLIFICATION = false;
ALLOW_AUTO_VECTORIZATION = false;
ALLOW_INTER_PROCEDURAL_ANALYSIS = false;
IPA_NUM_REPETITIONS = 1;
ALLOW_BRANCH_REMOVAL = false;
ALLOW_FOR_LOOP_REMOVAL = false;
ALLOW_SUM_PRODUCT_REWRITES = false;
ALLOW_LOOP_UPDATE_IN_PLACE = false;
break;
// opt level 2: memory-based (all advanced rewrites)
case 2:
cconf.set(ConfigType.OPT_LEVEL, OptimizationLevel.O2_LOCAL_MEMORY_DEFAULT.ordinal());
break;
// opt level 3: resource optimization, time- and memory-based (2 w/ resource optimizat)
case 3:
cconf.set(ConfigType.OPT_LEVEL, OptimizationLevel.O3_LOCAL_RESOURCE_TIME_MEMORY.ordinal());
break;
// opt level 3: global, time- and memory-based (all advanced rewrites)
case 4:
cconf.set(ConfigType.OPT_LEVEL, OptimizationLevel.O4_GLOBAL_TIME_MEMORY.ordinal());
break;
// opt level 6 and7: SPOOF w/o fused operators, otherwise same as O2
// (hidden optimization levels not documented on purpose, as they will
// be removed once SPOOF is production ready)
case 6:
cconf.set(ConfigType.OPT_LEVEL, OptimizationLevel.O2_LOCAL_MEMORY_DEFAULT.ordinal());
ALLOW_AUTO_VECTORIZATION = false;
break;
case 7:
cconf.set(ConfigType.OPT_LEVEL, OptimizationLevel.O2_LOCAL_MEMORY_DEFAULT.ordinal());
ALLOW_OPERATOR_FUSION = false;
ALLOW_AUTO_VECTORIZATION = false;
ALLOW_SUM_PRODUCT_REWRITES = false;
break;
}
//handle parallel text io (incl awareness of thread contention in <jdk8)
if (!dmlconf.getBooleanValue(DMLConfig.CP_PARALLEL_IO)) {
cconf.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false);
cconf.set(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS, false);
cconf.set(ConfigType.PARALLEL_CP_READ_BINARYFORMATS, false);
cconf.set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS, false);
}
//handle parallel matrix mult / rand configuration
if (!dmlconf.getBooleanValue(DMLConfig.CP_PARALLEL_OPS)) {
cconf.set(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS, false);
}
//handle federated runtime conversion to avoid string comparisons
String planner = dmlconf.getTextValue(DMLConfig.FEDERATED_PLANNER);
if( FederatedPlanner.RUNTIME.name().equalsIgnoreCase(planner) ) {
cconf.set(ConfigType.FEDERATED_RUNTIME, true);
}
return cconf;
}
public static void resetStaticCompilerFlags() {
//TODO this is a workaround for MLContext to avoid a major refactoring before the release; this method
//should be removed as soon all modified static variables are properly handled in the compiler config
ALLOW_ALGEBRAIC_SIMPLIFICATION = true;
ALLOW_AUTO_VECTORIZATION = true;
ALLOW_BRANCH_REMOVAL = true;
ALLOW_FOR_LOOP_REMOVAL = true;
ALLOW_CONSTANT_FOLDING = true;
ALLOW_COMMON_SUBEXPRESSION_ELIMINATION = true;
ALLOW_INTER_PROCEDURAL_ANALYSIS = true;
ALLOW_LOOP_UPDATE_IN_PLACE = true;
ALLOW_OPERATOR_FUSION = true;
ALLOW_RAND_JOB_RECOMPILE = true;
ALLOW_SIZE_EXPRESSION_EVALUATION = true;
ALLOW_SPLIT_HOP_DAGS = true;
ALLOW_SUM_PRODUCT_REWRITES = true;
ALLOW_WORSTCASE_SIZE_EXPRESSION_EVALUATION = true;
IPA_NUM_REPETITIONS = 3;
}
public static long getDefaultSize() {
//we need to set default_size larger than any execution context
//memory budget, however, it should not produce overflows on sum
return InfrastructureAnalyzer.getLocalMaxMemory();
}
public static void resetDefaultSize() {
DEFAULT_SIZE = getDefaultSize();
}
public static int getDefaultFrameSize() {
return DEFAULT_FRAME_BLOCKSIZE;
}
/**
* Returns memory budget (according to util factor) in bytes
*
* @return local memory budget
*/
public static double getLocalMemBudget() {
double ret = InfrastructureAnalyzer.getLocalMaxMemory();
return ret * OptimizerUtils.MEM_UTIL_FACTOR;
}
/**
* Returns buffer pool size as set in the config
*
* @return buffer pool size in bytes
*/
public static long getBufferPoolLimit() {
if (BUFFER_POOL_SIZE != 0)
return BUFFER_POOL_SIZE;
DMLConfig conf = ConfigurationManager.getDMLConfig();
double bufferPoolFactor = (double)(conf.getIntValue(DMLConfig.BUFFERPOOL_LIMIT))/100;
bufferPoolFactor = Math.max(bufferPoolFactor, DEFAULT_MEM_UTIL_FACTOR);
long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
return (long)(bufferPoolFactor * maxMem);
}
/**
* Check if unified memory manager is in effect
* @return boolean
*/
public static boolean isUMMEnabled() {
if (MEMORY_MANAGER == null) {
DMLConfig conf = ConfigurationManager.getDMLConfig();
boolean isUMM = conf.getTextValue(DMLConfig.MEMORY_MANAGER).equalsIgnoreCase("unified");
MEMORY_MANAGER = isUMM ? MemoryManager.UNIFIED_MEMORY_MANAGER : MemoryManager.STATIC_MEMORY_MANAGER;
}
return MEMORY_MANAGER == MemoryManager.UNIFIED_MEMORY_MANAGER;
}
/**
* Disable unified memory manager and fallback to static partitioning.
* Initialize LazyWriteBuffer with the default size (15%).
*/
public static void disableUMM() {
MEMORY_MANAGER = MemoryManager.STATIC_MEMORY_MANAGER;
LazyWriteBuffer.cleanup();
LazyWriteBuffer.init();
long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
BUFFER_POOL_SIZE = (long) (DEFAULT_MEM_UTIL_FACTOR * maxMem);
LazyWriteBuffer.setWriteBufferLimit(BUFFER_POOL_SIZE);
}
/**
* Enable unified memory manager and initialize with the default size (85%).
*/
public static void enableUMM() {
MEMORY_MANAGER = MemoryManager.UNIFIED_MEMORY_MANAGER;
UnifiedMemoryManager.cleanup();
UnifiedMemoryManager.init();
long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
BUFFER_POOL_SIZE = (long) (DEFAULT_UMM_UTIL_FACTOR * maxMem);
UnifiedMemoryManager.setUMMLimit(BUFFER_POOL_SIZE);
}
public static boolean isMaxLocalParallelism(int k) {
return InfrastructureAnalyzer.getLocalParallelism() == k;
}
public static boolean isTopLevelParFor() {
//since every local parfor with degree of parallelism k>1 changes the
//local memory budget, we can simply probe the current memory fraction
return InfrastructureAnalyzer.getLocalMaxMemoryFraction() >= 0.99;
}
public static boolean checkSparkBroadcastMemoryBudget( double size )
{
double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget();
double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
//basic requirement: the broadcast needs to to fit once in the remote broadcast memory
//and twice into the local memory budget because we have to create a partitioned broadcast
//memory and hand it over to the spark context as in-memory object
return ( size < memBudgetExec && 2*size < memBudgetLocal );
}
public static boolean checkSparkBroadcastMemoryBudget( long rlen, long clen, long blen, long nnz ) {
double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget();
double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
double sp = getSparsity(rlen, clen, nnz);
double size = estimateSizeExactSparsity(rlen, clen, sp);
double sizeP = estimatePartitionedSizeExactSparsity(rlen, clen, blen, sp);
//basic requirement: the broadcast needs to to fit once in the remote broadcast memory
//and twice into the local memory budget because we have to create a partitioned broadcast
//memory and hand it over to the spark context as in-memory object
return ( OptimizerUtils.isValidCPDimensions(rlen, clen)
&& sizeP < memBudgetExec && size+sizeP < memBudgetLocal );
}
public static boolean checkSparkCollectMemoryBudget(DataCharacteristics dc, long memPinned ) {
if (dc instanceof MatrixCharacteristics) {
return checkSparkCollectMemoryBudget(dc.getRows(), dc.getCols(),
dc.getBlocksize(), dc.getNonZerosBound(), memPinned, false);
} else {
long[] dims = dc.getDims();
return checkSparkCollectMemoryBudget(dims, dc.getNonZeros(), memPinned, false);
}
}
public static boolean checkSparkCollectMemoryBudget(DataCharacteristics dc, long memPinned, boolean checkBP ) {
if (dc instanceof MatrixCharacteristics) {
return checkSparkCollectMemoryBudget(dc.getRows(), dc.getCols(),
dc.getBlocksize(), dc.getNonZerosBound(), memPinned, checkBP);
} else {
long[] dims = dc.getDims();
return checkSparkCollectMemoryBudget(dims, dc.getNonZeros(), memPinned, checkBP);
}
}
private static boolean checkSparkCollectMemoryBudget( long rlen, long clen, int blen, long nnz, long memPinned, boolean checkBP ) {
//compute size of output matrix and its blocked representation
double sp = getSparsity(rlen, clen, nnz);
double memMatrix = estimateSizeExactSparsity(rlen, clen, sp);
double memPMatrix = estimatePartitionedSizeExactSparsity(rlen, clen, blen, sp);
//check if both output matrix and partitioned matrix fit into local mem budget
return (memPinned + memMatrix + memPMatrix < getLocalMemBudget())
//check if the output matrix fits into the write buffer to avoid unnecessary evictions
&& (!checkBP || memMatrix < LazyWriteBuffer.getWriteBufferLimit());
}
private static boolean checkSparkCollectMemoryBudget( long[] dims, long nnz, long memPinned, boolean checkBP ) {
//compute size of output matrix and its blocked representation
//double sp = getSparsity(dims, nnz);
// TODO estimate exact size
long doubleSize = UtilFunctions.prod(dims) * 8;
double memTensor = doubleSize;
double memPTensor = doubleSize;
//check if both output matrix and partitioned matrix fit into local mem budget
return (memPinned + memTensor + memPTensor < getLocalMemBudget())
//check if the output matrix fits into the write buffer to avoid unnecessary evictions
&& (!checkBP || memTensor < LazyWriteBuffer.getWriteBufferLimit());
}
public static boolean checkSparseBlockCSRConversion( DataCharacteristics dcIn ) {
//we use the non-zero bound to make the important csr decision in
//an best effort manner (the precise non-zeros is irrelevant here)
double sp = OptimizerUtils.getSparsity(
dcIn.getRows(), dcIn.getCols(), dcIn.getNonZerosBound());
return Checkpoint.CHECKPOINT_SPARSE_CSR
&& sp < MatrixBlock.SPARSITY_TURN_POINT;
}
/**
* Returns the number of tasks that potentially run in parallel.
* This is either just the configured value (SystemDS config) or
* the minimum of configured value and available task slots.
*
* @return number of tasks
*/
public static int getNumTasks() {
if( isSparkExecutionMode() )
return SparkExecutionContext.getDefaultParallelism(false);
return InfrastructureAnalyzer.getLocalParallelism();
}
public static ExecMode getDefaultExecutionMode() {
//default execution type is hybrid (cp+mr)
ExecMode ret = ExecMode.HYBRID;
//switch default to HYBRID (cp+spark) if in spark driver
String sparkenv = System.getenv().get("SPARK_ENV_LOADED");
if( sparkenv != null && sparkenv.equals("1") )
ret = ExecMode.HYBRID;
return ret;
}
public static boolean isSparkExecutionMode() {
return DMLScript.getGlobalExecMode() == ExecMode.SPARK
|| DMLScript.getGlobalExecMode() == ExecMode.HYBRID;
}
public static boolean isHybridExecutionMode() {
return DMLScript.getGlobalExecMode() == ExecMode.HYBRID;
}
/**
* Returns the degree of parallelism used for parallel text read.
* This is computed as the number of virtual cores scales by the
* PARALLEL_READ_PARALLELISM_MULTIPLIER. If PARALLEL_READ_TEXTFORMATS
* is disabled, this method returns 1.
*
* @return degree of parallelism
*/
public static int getParallelTextReadParallelism()
{
if( !ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) )
return 1; // sequential execution
//compute degree of parallelism for parallel text read
double dop = InfrastructureAnalyzer.getLocalParallelism()
* PARALLEL_CP_READ_PARALLELISM_MULTIPLIER;
return (int) Math.round(dop);
}
public static int getParallelBinaryReadParallelism()
{
if( !ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) )
return 1; // sequential execution
//compute degree of parallelism for parallel text read
double dop = InfrastructureAnalyzer.getLocalParallelism()
* PARALLEL_CP_READ_PARALLELISM_MULTIPLIER;
return (int) Math.round(dop);
}
/**
* Returns the degree of parallelism used for parallel text write.
* This is computed as the number of virtual cores scales by the
* PARALLEL_WRITE_PARALLELISM_MULTIPLIER. If PARALLEL_WRITE_TEXTFORMATS
* is disabled, this method returns 1.
*
* @return degree of parallelism
*/
public static int getParallelTextWriteParallelism()
{
if( !ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS) )
return 1; // sequential execution
//compute degree of parallelism for parallel text read
double dop = InfrastructureAnalyzer.getLocalParallelism()
* PARALLEL_CP_WRITE_PARALLELISM_MULTIPLIER;
return (int) Math.round(dop);
}
public static int getParallelBinaryWriteParallelism()
{
if( !ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS) )
return 1; // sequential execution
//compute degree of parallelism for parallel text read
double dop = InfrastructureAnalyzer.getLocalParallelism()
* PARALLEL_CP_WRITE_PARALLELISM_MULTIPLIER;
return (int) Math.round(dop);
}
////////////////////////
// Memory Estimates //
////////////////////////
public static long estimateSize(DataCharacteristics dc) {
return estimateSizeExactSparsity(dc);
}
public static long estimateSizeExactSparsity(DataCharacteristics dc)
{
return estimateSizeExactSparsity(
dc.getRows(),
dc.getCols(),
dc.getNonZeros());
}
/**
* Estimates the footprint (in bytes) for an in-memory representation of a
* matrix with dimensions=(nrows,ncols) and and number of non-zeros nnz.
*
* @param nrows number of rows
* @param ncols number of cols
* @param nnz number of non-zeros
* @return memory footprint
*/
public static long estimateSizeExactSparsity(long nrows, long ncols, long nnz)
{
double sp = getSparsity(nrows, ncols, nnz);
return estimateSizeExactSparsity(nrows, ncols, sp);
}
public static long estimateSizeExactFrame(long nRows, long nCols){
// Currently we do not support frames larger than INT.
// Therefore, we estimate their size to be extremely large.
// The large size force spark operations.
if(nRows > Integer.MAX_VALUE)
return Long.MAX_VALUE;
// assuming String arrays and on average 8 characters per value.
return (long)MemoryEstimates.stringArrayCost((int)nRows, 8) * nCols;
}
/**
* Estimates the footprint (in bytes) for an in-memory representation of a
* matrix with dimensions=(nrows,ncols) and sparsity=sp.
*
* This function can be used directly in Hops, when the actual sparsity is
* known i.e., <code>sp</code> is guaranteed to give worst-case estimate
* (e.g., Rand with a fixed sparsity). In all other cases, estimateSize()
* must be used so that worst-case estimates are computed, whenever
* applicable.
*
* @param nrows number of rows
* @param ncols number of cols
* @param sp sparsity
* @return memory footprint
*/
public static long estimateSizeExactSparsity(long nrows, long ncols, double sp)
{
return MatrixBlock.estimateSizeInMemory(nrows,ncols,sp);
}
public static long estimateSizeExactSparsity(long nrows, long ncols, double sp, DataType dt){
if(dt == DataType.FRAME)
return estimateSizeExactFrame(nrows, ncols);
else
return estimateSizeExactSparsity(nrows, ncols, sp);
}
/**
* Estimates the footprint (in bytes) for a partitioned in-memory representation of a
* matrix with the given matrix characteristics
*
* @param dc matrix characteristics
* @return memory estimate
*/
public static long estimatePartitionedSizeExactSparsity(DataCharacteristics dc) {
return estimatePartitionedSizeExactSparsity(dc, true);
}
public static long estimatePartitionedSizeExactSparsity(DataCharacteristics dc, boolean outputEmptyBlocks)
{
if (dc instanceof MatrixCharacteristics) {
return estimatePartitionedSizeExactSparsity(
dc.getRows(), dc.getCols(), dc.getBlocksize(),
dc.getNonZerosBound(), outputEmptyBlocks);
}
else {
// TODO estimate partitioned size exact for tensor
long inaccurateSize = 8; // 8 for double
for (int i = 0; i < dc.getNumDims(); i++) {
inaccurateSize *= dc.getDim(i);
}
return inaccurateSize;
}
}
/**
* Estimates the footprint (in bytes) for a partitioned in-memory representation of a
* matrix with dimensions=(nrows,ncols) and number of non-zeros nnz.
*
* @param rlen number of rows
* @param clen number of cols
* @param blen rows/cols per block
* @param nnz number of non-zeros
* @return memory estimate
*/
public static long estimatePartitionedSizeExactSparsity(long rlen, long clen, long blen, long nnz) {
double sp = getSparsity(rlen, clen, nnz);
return estimatePartitionedSizeExactSparsity(rlen, clen, blen, sp);
}
public static long estimatePartitionedSizeExactSparsity(long rlen, long clen, long blen, long nnz, boolean outputEmptyBlocks) {
double sp = getSparsity(rlen, clen, nnz);
return estimatePartitionedSizeExactSparsity(rlen, clen, blen, sp, outputEmptyBlocks);
}
/**
* Estimates the footprint (in bytes) for a partitioned in-memory representation of a
* matrix with the hops dimensions and number of non-zeros nnz.
*
* @param hop The hop to extract dimensions and nnz from
* @return the memory estimate
*/
public static long estimatePartitionedSizeExactSparsity(Hop hop){
long rlen = hop.getDim1();
long clen = hop.getDim2();
int blen = hop.getBlocksize();
long nnz = hop.getNnz();
return estimatePartitionedSizeExactSparsity(rlen, clen, blen, nnz);
}
/**
* Estimates the footprint (in bytes) for a partitioned in-memory representation of a
* matrix with dimensions=(nrows,ncols) and sparsity=sp.
*
* @param rlen number of rows
* @param clen number of cols
* @param blen rows/cols per block
* @param sp sparsity
* @return memory estimate
*/
public static long estimatePartitionedSizeExactSparsity(long rlen, long clen, long blen, double sp) {
return estimatePartitionedSizeExactSparsity(rlen, clen, blen, sp, true);
}
public static long estimatePartitionedSizeExactSparsity(long rlen, long clen, long blen, double sp, boolean outputEmptyBlocks)
{
long ret = 0;
//check for guaranteed existence of empty blocks (less nnz than total number of blocks)
long tnrblks = (long)Math.ceil((double)rlen/blen);
long tncblks = (long)Math.ceil((double)clen/blen);
long nnz = (long) Math.ceil(sp * rlen * clen);
if( nnz <= tnrblks * tncblks ) {
long lrlen = Math.min(rlen, blen);
long lclen = Math.min(clen, blen);
return nnz * MatrixBlock.estimateSizeSparseInMemory(lrlen, lclen, 1d/lrlen/lclen, Type.COO)
+ (outputEmptyBlocks ? (tnrblks * tncblks - nnz) * estimateSizeEmptyBlock(lrlen, lclen) : 0);
}
//estimate size of full blen x blen blocks
long nrblks = rlen / blen;
long ncblks = clen / blen;
if( nrblks * ncblks > 0 )
ret += nrblks * ncblks * estimateSizeExactSparsity(blen, blen, sp);
//estimate size of bottom boundary blocks
long lrlen = rlen % blen;
if( ncblks > 0 && lrlen >= 0 )
ret += ncblks * estimateSizeExactSparsity(lrlen, blen, sp);
//estimate size of right boundary blocks
long lclen = clen % blen;
if( nrblks > 0 && lclen >= 0 )
ret += nrblks * estimateSizeExactSparsity(blen, lclen, sp);
//estimate size of bottom right boundary block
if( lrlen >= 0 && lclen >= 0 )
ret += estimateSizeExactSparsity(lrlen, lclen, sp);
return ret;
}
/**
* Similar to estimate() except that it provides worst-case estimates
* when the optimization type is ROBUST.
*
* @param nrows number of rows
* @param ncols number of cols
* @return memory estimate
*/
public static long estimateSize(long nrows, long ncols) {
return estimateSizeExactSparsity(nrows, ncols, 1.0);
}
public static long estimateSizeEmptyBlock(long nrows, long ncols) {
return estimateSizeExactSparsity(0, 0, 0.0d);
}
public static long estimateSizeTextOutput(long rows, long cols, long nnz, FileFormat fmt) {
long bsize = MatrixBlock.estimateSizeOnDisk(rows, cols, nnz);
if( fmt.isIJV() )
return bsize * 3;
else if( fmt == FileFormat.LIBSVM )
return Math.round(bsize * 2.5);
else if( fmt == FileFormat.CSV )
return bsize * 2;
return bsize;
}
public static long estimateSizeTextOutput(int[] dims, long nnz, FileFormat fmt) {
// TODO accurate estimation
if( fmt == FileFormat.TEXT )
// nnz * (8 bytes for number + each dimension with an expected String length of 3 and one space)
return nnz * (8 + dims.length * 4); // very simple estimation. example:100 100 1.345678
throw new DMLRuntimeException("Tensor output format not implemented.");
}
public static double getTotalMemEstimate(Hop[] in, Hop out) {
return getTotalMemEstimate(in, out, false);
}