-
Notifications
You must be signed in to change notification settings - Fork 136
Expand file tree
/
Copy pathNonBlockingUdpSender.java
More file actions
90 lines (78 loc) · 2.89 KB
/
NonBlockingUdpSender.java
File metadata and controls
90 lines (78 loc) · 2.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.timgroup.statsd;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public final class NonBlockingUdpSender {
private final Charset encoding;
private final DatagramChannel clientSocket;
private final ExecutorService executor;
private final StatsDClientErrorHandler handler;
public NonBlockingUdpSender(String hostname, int port, Charset encoding, StatsDClientErrorHandler handler) throws IOException {
this.encoding = encoding;
this.handler = handler;
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
this.executor = createExecutorService("StatsD-");
}
public NonBlockingUdpSender(String hostname, String threadPreffix, int port, Charset encoding, StatsDClientErrorHandler handler) throws IOException {
this.encoding = encoding;
this.handler = handler;
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
this.executor = createExecutorService(threadPreffix);
}
private static ExecutorService createExecutorService(String threadPreffix) {
return Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override
public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName(threadPreffix + result.getName());
result.setDaemon(true);
return result;
}
});
}
public void stop() {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (Exception e) {
handler.handle(e);
} finally {
if (clientSocket != null) {
try {
clientSocket.close();
} catch (Exception e) {
handler.handle(e);
}
}
}
}
public void send(final String message) {
try {
executor.execute(new Runnable() {
@Override
public void run() {
blockingSend(message);
}
});
} catch (Exception e) {
handler.handle(e);
}
}
private void blockingSend(String message) {
try {
final byte[] sendData = message.getBytes(encoding);
clientSocket.write(ByteBuffer.wrap(sendData));
} catch (Exception e) {
handler.handle(e);
}
}
}