1818import io .micrometer .core .instrument .MeterRegistry ;
1919import io .netty .util .HashedWheelTimer ;
2020import io .netty .util .Timer ;
21+ import org .junit .jupiter .api .AfterEach ;
2122import org .junit .jupiter .api .BeforeEach ;
2223import org .junit .jupiter .api .Test ;
2324import org .junit .jupiter .api .Timeout ;
2425import org .slf4j .Logger ;
2526import org .slf4j .LoggerFactory ;
26- import org .testcontainers .containers .TarantoolContainer ;
27- import org .testcontainers .containers .output .Slf4jLogConsumer ;
28- import org .testcontainers .junit .jupiter .Container ;
29- import org .testcontainers .junit .jupiter .Testcontainers ;
27+ import org .testcontainers .containers .tarantool .TarantoolContainer ;
28+ import org .testcontainers .containers .utils .TarantoolContainerClientHelper ;
3029
3130import static io .tarantool .core .protocol .requests .IProtoConstant .IPROTO_DATA ;
3231import io .tarantool .balancer .TarantoolBalancer ;
4039import io .tarantool .pool .IProtoClientPoolImpl ;
4140import io .tarantool .pool .InstanceConnectionGroup ;
4241
43- @ Timeout (value = 15 )
44- @ Testcontainers
42+ @ Timeout (value = 25 )
4543public class DistributingRoundRobinBalancerTest extends BaseTest {
4644
4745 private static final Logger log =
4846 LoggerFactory .getLogger (DistributingRoundRobinBalancerTest .class );
4947
50- @ Container
51- private final TarantoolContainer tt1 =
52- new TarantoolContainer ()
53- .withEnv (ENV_MAP )
54- .withExposedPort (3305 )
55- .withLogConsumer (new Slf4jLogConsumer (log ));
56-
57- @ Container
58- private final TarantoolContainer tt2 =
59- new TarantoolContainer ()
60- .withEnv (ENV_MAP )
61- .withExposedPort (3305 )
62- .withLogConsumer (new Slf4jLogConsumer (log ));
48+ private static TarantoolContainer <?> tt1 ;
49+ private static TarantoolContainer <?> tt2 ;
6350
6451 @ BeforeEach
6552 public void setUp () {
53+ tt1 =
54+ TarantoolContainerClientHelper .createTarantoolContainer ()
55+ .withEnv (ENV_MAP )
56+ .withExposedPorts (3305 );
57+ tt2 =
58+ TarantoolContainerClientHelper .createTarantoolContainer ()
59+ .withEnv (ENV_MAP )
60+ .withExposedPorts (3305 );
61+
62+ tt1 .start ();
63+ tt2 .start ();
64+
65+ TarantoolContainerClientHelper .execInitScript (tt1 );
66+ TarantoolContainerClientHelper .execInitScript (tt2 );
6667 do {
6768 count1 = ThreadLocalRandom .current ().nextInt (MIN_CONNECTION_COUNT , MAX_CONNECTION_COUNT + 1 );
6869 count2 = ThreadLocalRandom .current ().nextInt (MIN_CONNECTION_COUNT , MAX_CONNECTION_COUNT + 1 );
6970 } while (count1 == count2 );
7071 }
7172
73+ @ AfterEach
74+ void tearDown () {
75+ tt1 .stop ();
76+ tt2 .stop ();
77+ }
78+
7279 private static IProtoClientPool createClientPool (
7380 boolean gracefulShutdown , HeartbeatOpts heartbeatOpts , MeterRegistry metricsRegistry ) {
7481 ManagedResource <Timer > timerResource =
@@ -78,25 +85,30 @@ private static IProtoClientPool createClientPool(
7885 factory , timerResource , gracefulShutdown , heartbeatOpts , null , metricsRegistry );
7986 }
8087
81- private int getSessionCounter (TarantoolContainer tt ) throws Exception {
82- List <?> result = tt .executeCommandDecoded ("return get_session_counter()" );
88+ private int getSessionCounter (TarantoolContainer <?> tt ) throws Exception {
89+ List <?> result =
90+ TarantoolContainerClientHelper .executeCommandDecoded (tt , "return get_session_counter()" );
8391 return (Integer ) result .get (0 );
8492 }
8593
86- private int getCallCounter (TarantoolContainer tt ) throws Exception {
87- List <?> result = tt .executeCommandDecoded ("return get_call_counter()" );
94+ private int getCallCounter (TarantoolContainer <?> tt ) throws Exception {
95+ List <?> result =
96+ TarantoolContainerClientHelper .executeCommandDecoded (tt , "return get_call_counter()" );
8897 return (Integer ) result .get (0 );
8998 }
9099
91- private void execLua (TarantoolContainer container , String command ) {
100+ private void execLua (TarantoolContainer <?> container , String command ) {
92101 try {
93- container .executeCommandDecoded (command );
94- } catch (Exception e ) {
102+ TarantoolContainerClientHelper .executeCommandDecoded (container , command );
103+ } catch (Exception ignored ) {
95104 }
96105 }
97106
98107 private void wakeUpAllConnects (
99- TarantoolBalancer rrBalancer , int nodeVisits , TarantoolContainer tt1 , TarantoolContainer tt2 )
108+ TarantoolBalancer rrBalancer ,
109+ int nodeVisits ,
110+ TarantoolContainer <?> tt1 ,
111+ TarantoolContainer <?> tt2 )
100112 throws Exception {
101113 walkAndJoin (rrBalancer , nodeVisits * 2 );
102114 assertEquals (count1 , getSessionCounter (tt1 ));
@@ -130,13 +142,13 @@ public void testDistributingRoundRobin() throws Exception {
130142 Arrays .asList (
131143 InstanceConnectionGroup .builder ()
132144 .withHost (tt1 .getHost ())
133- .withPort (tt1 .getPort ())
145+ .withPort (tt1 .getFirstMappedPort ())
134146 .withSize (count1 )
135147 .withTag ("node-a-00" )
136148 .build (),
137149 InstanceConnectionGroup .builder ()
138150 .withHost (tt2 .getHost ())
139- .withPort (tt2 .getPort ())
151+ .withPort (tt2 .getFirstMappedPort ())
140152 .withSize (count2 )
141153 .withTag ("node-b-00" )
142154 .build ()));
@@ -160,7 +172,7 @@ public void testDistributingRoundRobinWithUnavailableNodeA() throws Exception {
160172 .build (),
161173 InstanceConnectionGroup .builder ()
162174 .withHost (tt2 .getHost ())
163- .withPort (tt2 .getPort ())
175+ .withPort (tt2 .getFirstMappedPort ())
164176 .withSize (count2 )
165177 .withTag ("node-b-01" )
166178 .build ()));
@@ -176,8 +188,10 @@ public void testDistributingRoundRobinWithUnavailableNodeA() throws Exception {
176188 execLua (tt2 , "reset_call_counter()" );
177189
178190 log .info ("waiting for invalidation" );
179- Thread .sleep (INVALIDATION_TIMEOUT );
191+ Thread .sleep (EXTENDED_INVALIDATION_TIMEOUT );
180192
193+ execLua (tt1 , "reset_call_counter()" );
194+ execLua (tt2 , "reset_call_counter()" );
181195 log .info ("walk on node B only" );
182196 walkAndJoin (rrBalancer , nodeVisits * 2 );
183197 assertEquals (0 , getCallCounter (tt1 ));
@@ -217,7 +231,7 @@ public void testDistributingRoundRobinWithUnavailableNodeANoUnlock() throws Exce
217231 .build (),
218232 InstanceConnectionGroup .builder ()
219233 .withHost (tt2 .getHost ())
220- .withPort (tt2 .getPort ())
234+ .withPort (tt2 .getFirstMappedPort ())
221235 .withSize (count2 )
222236 .withTag ("node-b-02" )
223237 .build ()));
@@ -233,8 +247,10 @@ public void testDistributingRoundRobinWithUnavailableNodeANoUnlock() throws Exce
233247 execLua (tt2 , "reset_call_counter()" );
234248
235249 log .info ("waiting for invalidation" );
236- Thread .sleep (INVALIDATION_TIMEOUT );
250+ Thread .sleep (EXTENDED_INVALIDATION_TIMEOUT );
237251
252+ execLua (tt1 , "reset_call_counter()" );
253+ execLua (tt2 , "reset_call_counter()" );
238254 log .info ("walk on node B only" );
239255 walkAndJoin (rrBalancer , nodeVisits * 2 );
240256 assertEquals (0 , getCallCounter (tt1 ));
@@ -288,7 +304,7 @@ public void testDistributingRoundRobinNoAvailableClients() throws Exception {
288304 execLua (tt2 , "lock_pipe(true); reset_call_counter()" );
289305
290306 log .info ("waiting for invalidation" );
291- Thread .sleep (INVALIDATION_TIMEOUT );
307+ Thread .sleep (EXTENDED_INVALIDATION_TIMEOUT );
292308
293309 Throwable exc = assertThrows (ExecutionException .class , () -> rrBalancer .getNext ().get ());
294310 assertEquals (NoAvailableClientsException .class , exc .getCause ().getClass ());
@@ -323,12 +339,12 @@ public void testDistributingRoundRobinStartWithStuckNodeA() throws Exception {
323339 Arrays .asList (
324340 InstanceConnectionGroup .builder ()
325341 .withHost (tt1 .getHost ())
326- .withPort (tt1 .getPort ())
342+ .withPort (tt1 .getFirstMappedPort ())
327343 .withTag ("node-a-01" )
328344 .build (),
329345 InstanceConnectionGroup .builder ()
330346 .withHost (tt2 .getHost ())
331- .withPort (tt2 .getPort ())
347+ .withPort (tt2 .getFirstMappedPort ())
332348 .withTag ("node-b-01" )
333349 .build ()));
334350 pool .setConnectTimeout (3_000 );
0 commit comments