Skip to content

Commit ce93f75

Browse files
committed
Update Spark verions
* move to spark 2.4.0 (latest) * move to scala 2.11 (preferred for said release) * clean up example app using Java 8 lamda expressions
1 parent 7e52957 commit ce93f75

File tree

5 files changed

+27
-70
lines changed

5 files changed

+27
-70
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ Prerequisites:
9797

9898
Build as described above. Optionally choose Scala and Spark versions appropriate for your distribution via `scala.version` and `spark.version`. The defaults are the equivalent to
9999

100-
$ mvn -Dscala.version=2.10 -Dspark.version=1.5.0
100+
$ mvn -Dscala.version=2.11 -Dspark.version=2.4.0
101101

102102
Run a netcat instance generating data (below examples presume this host is named _netcat.running.host.example.com_
103103

hbase-1/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
<packaging>jar</packaging>
1212
<name>Downstream HBase 1.y API</name>
1313
<properties>
14-
<scala.version>2.10</scala.version>
15-
<spark.version>1.5.0</spark.version>
14+
<scala.version>2.11</scala.version>
15+
<spark.version>2.4.0</spark.version>
1616
<hbase.version>${hbase.1.version}</hbase.version>
1717
<hadoop.version>2.7.1</hadoop.version>
1818
</properties>

hbase-1/src/main/java/org/hbase/downstreamer/spark/JavaNetworkWordCountStoreInHBase.java

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Changes Copyright 2016 hbase-downstreamer contributor(s).
2+
* Changes Copyright 2016,2019 hbase-downstreamer contributor(s).
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,13 +33,13 @@
3333
*
3434
* derived from:
3535
* https://raw.githubusercontent.com/apache/spark/v1.5.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
36+
* https://raw.githubusercontent.com/apache/spark/v2.4.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
3637
*
3738
*/
3839

3940
package org.hbase.downstreamer.spark;
4041

4142
import scala.Tuple2;
42-
import com.google.common.collect.Lists;
4343

4444
import org.apache.hadoop.conf.Configuration;
4545
import org.apache.hadoop.security.UserGroupInformation;
@@ -54,11 +54,7 @@
5454
import org.apache.hadoop.hbase.util.Bytes;
5555

5656
import org.apache.spark.SparkConf;
57-
import org.apache.spark.api.java.function.FlatMapFunction;
58-
import org.apache.spark.api.java.function.Function2;
59-
import org.apache.spark.api.java.function.PairFunction;
6057
import org.apache.spark.api.java.function.VoidFunction;
61-
import org.apache.spark.api.java.JavaPairRDD;
6258
import org.apache.spark.api.java.StorageLevels;
6359
import org.apache.spark.deploy.SparkHadoopUtil;
6460
import org.apache.spark.streaming.Durations;
@@ -76,6 +72,7 @@
7672
import java.security.PrivilegedExceptionAction;
7773
import java.util.concurrent.ConcurrentHashMap;
7874
import java.util.regex.Pattern;
75+
import java.util.Arrays;
7976
import java.util.Iterator;
8077

8178
/**
@@ -214,7 +211,7 @@ public void call(Iterator<Tuple2<String, Integer>> iterator) throws IOException,
214211

215212
private static final Pattern SPACE = Pattern.compile(" ");
216213

217-
public static void main(String[] args) {
214+
public static void main(String[] args) throws InterruptedException {
218215
if (args.length < 2) {
219216
System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
220217
System.exit(1);
@@ -233,34 +230,15 @@ public static void main(String[] args) {
233230
// Replication necessary in distributed scenario for fault tolerance.
234231
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
235232
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
236-
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
237-
@Override
238-
public Iterable<String> call(String x) {
239-
return Lists.newArrayList(SPACE.split(x));
240-
}
241-
});
242-
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
243-
new PairFunction<String, String, Integer>() {
244-
@Override
245-
public Tuple2<String, Integer> call(String s) {
246-
return new Tuple2<String, Integer>(s, 1);
247-
}
248-
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
249-
@Override
250-
public Integer call(Integer i1, Integer i2) {
251-
return i1 + i2;
252-
}
253-
});
233+
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
234+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s,1))
235+
.reduceByKey((i1, i2) -> i1 + i2);
254236

255237
final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);
256238

257-
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
258-
@Override
259-
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
260-
store.setTime(time);
261-
rdd.foreachPartition(store);
262-
return null;
263-
}
239+
wordCounts.foreachRDD((rdd, time) -> {
240+
store.setTime(time);
241+
rdd.foreachPartition(store);
264242
});
265243

266244
ssc.start();

hbase-2/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
<properties>
1414
<hbase.version>${hbase.2.version}</hbase.version>
1515
<hadoop.version>2.8.5</hadoop.version>
16-
<!-- These should match default from HBase 2 release -->
17-
<scala.version>2.10.4</scala.version>
18-
<scala.binary>2.10</scala.binary>
19-
<spark.version>1.6.0</spark.version>
16+
<!-- These should match default from HBase Connectors release -->
17+
<scala.version>2.11.12</scala.version>
18+
<scala.binary>2.11</scala.binary>
19+
<spark.version>2.4.0</spark.version>
2020
</properties>
2121
<dependencies>
2222
<!--START OF TEST SCOPE-->

hbase-2/src/main/java/org/hbase/downstreamer/spark/JavaNetworkWordCountStoreInHBase.java

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Changes Copyright 2016 hbase-downstreamer contributor(s).
2+
* Changes Copyright 2016,2019 hbase-downstreamer contributor(s).
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,13 +33,13 @@
3333
*
3434
* derived from:
3535
* https://raw.githubusercontent.com/apache/spark/v1.5.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
36+
* https://raw.githubusercontent.com/apache/spark/v2.4.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
3637
*
3738
*/
3839

3940
package org.hbase.downstreamer.spark;
4041

4142
import scala.Tuple2;
42-
import com.google.common.collect.Lists;
4343

4444
import org.apache.hadoop.conf.Configuration;
4545
import org.apache.hadoop.security.UserGroupInformation;
@@ -54,9 +54,6 @@
5454
import org.apache.hadoop.hbase.util.Bytes;
5555

5656
import org.apache.spark.SparkConf;
57-
import org.apache.spark.api.java.function.FlatMapFunction;
58-
import org.apache.spark.api.java.function.Function2;
59-
import org.apache.spark.api.java.function.PairFunction;
6057
import org.apache.spark.api.java.function.VoidFunction;
6158
import org.apache.spark.api.java.JavaPairRDD;
6259
import org.apache.spark.api.java.StorageLevels;
@@ -76,6 +73,7 @@
7673
import java.security.PrivilegedExceptionAction;
7774
import java.util.concurrent.ConcurrentHashMap;
7875
import java.util.regex.Pattern;
76+
import java.util.Arrays;
7977
import java.util.Iterator;
8078

8179
/**
@@ -214,7 +212,7 @@ public void call(Iterator<Tuple2<String, Integer>> iterator) throws IOException,
214212

215213
private static final Pattern SPACE = Pattern.compile(" ");
216214

217-
public static void main(String[] args) {
215+
public static void main(String[] args) throws InterruptedException {
218216
if (args.length < 2) {
219217
System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
220218
System.exit(1);
@@ -233,34 +231,15 @@ public static void main(String[] args) {
233231
// Replication necessary in distributed scenario for fault tolerance.
234232
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
235233
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
236-
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
237-
@Override
238-
public Iterable<String> call(String x) {
239-
return Lists.newArrayList(SPACE.split(x));
240-
}
241-
});
242-
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
243-
new PairFunction<String, String, Integer>() {
244-
@Override
245-
public Tuple2<String, Integer> call(String s) {
246-
return new Tuple2<String, Integer>(s, 1);
247-
}
248-
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
249-
@Override
250-
public Integer call(Integer i1, Integer i2) {
251-
return i1 + i2;
252-
}
253-
});
234+
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
235+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s,1))
236+
.reduceByKey((i1, i2) -> i1 + i2);
254237

255238
final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);
256239

257-
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
258-
@Override
259-
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
260-
store.setTime(time);
261-
rdd.foreachPartition(store);
262-
return null;
263-
}
240+
wordCounts.foreachRDD((rdd, time) -> {
241+
store.setTime(time);
242+
rdd.foreachPartition(store);
264243
});
265244

266245
ssc.start();

0 commit comments

Comments
 (0)