Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,35 @@
package org.apache.flink.state.forst;

import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.concurrent.Executors;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;

import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** Tests for {@link ForStStateBackend} on initialization. */
public class ForStInitITCase {
class ForStInitITCase {

@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@TempDir private java.nio.file.Path temporaryFolder;

/**
* This test checks that the ForSt native code loader still responds to resetting the init flag.
*/
@Test
public void testResetInitFlag() throws Exception {
void testResetInitFlag() throws Exception {
ForStStateBackend.resetForStLoadedFlag();
}

@Test
public void testTempLibFolderDeletedOnFail() throws Exception {
void testTempLibFolderDeletedOnFail() throws Exception {
ForStStateBackend.setForStInitialized(false);
File tempFolder = temporaryFolder.newFolder();
File tempFolder = TempDirUtils.newFolder(temporaryFolder);
try {
ForStStateBackend.ensureForStIsLoaded(
tempFolder.getAbsolutePath(),
Expand All @@ -60,7 +60,7 @@ public void testTempLibFolderDeletedOnFail() throws Exception {
// ignored
}
File[] files = tempFolder.listFiles();
Assert.assertNotNull(files);
Assert.assertEquals(0, files.length);
assertThat(files).isNotNull();
assertThat(files).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,33 @@

package org.apache.flink.state.forst;

import org.apache.flink.testutils.junit.utils.TempDirUtils;

import org.forstdb.Cache;
import org.forstdb.NativeLibraryLoader;
import org.forstdb.WriteBufferManager;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests to guard {@link ForStMemoryControllerUtils}. */
public class ForStMemoryControllerUtilsTest {
class ForStMemoryControllerUtilsTest {

@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@TempDir private Path temporaryFolder;

@Before
public void ensureRocksDbNativeLibraryLoaded() throws IOException {
@BeforeEach
void ensureRocksDbNativeLibraryLoaded() throws IOException {
NativeLibraryLoader.getInstance()
.loadLibrary(temporaryFolder.newFolder().getAbsolutePath());
.loadLibrary(TempDirUtils.newFolder(temporaryFolder).getAbsolutePath());
}

@Test
public void testCreateSharedResourcesWithExpectedCapacity() {
void testCreateSharedResourcesWithExpectedCapacity() {
long totalMemorySize = 2048L;
double writeBufferRatio = 0.5;
double highPriPoolRatio = 0.1;
Expand All @@ -60,50 +59,55 @@ public void testCreateSharedResourcesWithExpectedCapacity() {
ForStMemoryControllerUtils.calculateWriteBufferManagerCapacity(
totalMemorySize, writeBufferRatio);

assertThat(factory.actualCacheCapacity, is(expectedCacheCapacity));
assertThat(factory.actualWbmCapacity, is(expectedWbmCapacity));
assertThat(forStSharedResources.getWriteBufferManagerCapacity(), is(expectedWbmCapacity));
assertThat(factory.actualCacheCapacity).isEqualTo(expectedCacheCapacity);
assertThat(factory.actualWbmCapacity).isEqualTo(expectedWbmCapacity);
assertThat(forStSharedResources.getWriteBufferManagerCapacity())
.isEqualTo(expectedWbmCapacity);
}

@Test
public void testCalculateForStDefaultArenaBlockSize() {
void testCalculateForStDefaultArenaBlockSize() {
final long align = 4 * 1024;
final long writeBufferSize = 64 * 1024 * 1024;
final long expectArenaBlockSize = writeBufferSize / 8;

// Normal case test
assertThat(
"Arena block size calculation error for normal case",
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize),
is(expectArenaBlockSize));
assertThat(ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize))
.as("Arena block size calculation error for normal case")
.isEqualTo(expectArenaBlockSize);

// Alignment tests
assertThat(
"Arena block size calculation error for alignment case",
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize - 1),
is(expectArenaBlockSize));
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(
writeBufferSize - 1))
.as("Arena block size calculation error for alignment case")
.isEqualTo(expectArenaBlockSize);
assertThat(
"Arena block size calculation error for alignment case2",
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize + 8),
is(expectArenaBlockSize + align));
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(
writeBufferSize + 8))
.as("Arena block size calculation error for alignment case2")
.isEqualTo(expectArenaBlockSize + align);
}

@Test
public void testCalculateForStMutableLimit() {
void testCalculateForStMutableLimit() {
long bufferSize = 64 * 1024 * 1024;
long limit = bufferSize * 7 / 8;
assertThat(ForStMemoryControllerUtils.calculateForStMutableLimit(bufferSize), is(limit));
assertThat(ForStMemoryControllerUtils.calculateForStMutableLimit(bufferSize))
.isEqualTo(limit);
}

@Test
public void testValidateArenaBlockSize() {
void testValidateArenaBlockSize() {
long arenaBlockSize = 8 * 1024 * 1024;
assertFalse(
ForStMemoryControllerUtils.validateArenaBlockSize(
arenaBlockSize, (long) (arenaBlockSize * 0.5)));
assertTrue(
ForStMemoryControllerUtils.validateArenaBlockSize(
arenaBlockSize, (long) (arenaBlockSize * 1.5)));
assertThat(
ForStMemoryControllerUtils.validateArenaBlockSize(
arenaBlockSize, (long) (arenaBlockSize * 0.5)))
.isFalse();
assertThat(
ForStMemoryControllerUtils.validateArenaBlockSize(
arenaBlockSize, (long) (arenaBlockSize * 1.5)))
.isTrue();
}

private static final class TestingForStMemoryFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,33 @@

package org.apache.flink.state.forst;

import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.concurrent.Executors;

import org.forstdb.RocksDB;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.lang.reflect.Method;
import java.net.URL;
import java.nio.file.Path;
import java.util.concurrent.Executor;

import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.junit.Assert.assertNotEquals;
import static org.assertj.core.api.Assertions.assertThat;

/**
* This test validates that the ForSt JNI library loading works properly in the presence of the
* ForSt code being loaded dynamically via reflection. That can happen when ForSt is in the user
* code JAR, or in certain test setups. TODO: test working with both ForSt and RocksDB
*/
public class ForStMultiClassLoaderTest {
class ForStMultiClassLoaderTest {

@Rule public final TemporaryFolder tmp = new TemporaryFolder();
@TempDir private Path tmp;

@Test
public void testTwoSeparateClassLoaders() throws Exception {
void testTwoSeparateClassLoaders() throws Exception {
// collect the libraries / class folders with ForSt related code: the state backend and
// ForSt itself
final URL codePath1 =
Expand All @@ -70,13 +71,14 @@ public void testTwoSeparateClassLoaders() throws Exception {

final Class<?> clazz1 = Class.forName(className, false, loader1);
final Class<?> clazz2 = Class.forName(className, false, loader2);
assertNotEquals(
"Test broken - the two reflectively loaded classes are equal", clazz1, clazz2);
assertThat(clazz1)
.as("Test broken - the two reflectively loaded classes are equal")
.isNotEqualTo(clazz2);

final Object instance1 = clazz1.getConstructor().newInstance();
final Object instance2 = clazz2.getConstructor().newInstance();

final String tempDir = tmp.newFolder().getAbsolutePath();
final String tempDir = TempDirUtils.newFolder(tmp).getAbsolutePath();

final Method meth1 =
clazz1.getDeclaredMethod("ensureForStIsLoaded", String.class, Executor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

import org.apache.flink.configuration.Configuration;

import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
import static org.assertj.core.api.Assertions.assertThat;

/** Test all native metrics can be set using configuration. */
public class ForStNativeMetricOptionsTest {
class ForStNativeMetricOptionsTest {
@Test
public void testNativeMetricsConfigurable() {
void testNativeMetricsConfigurable() {
for (ForStProperty property : ForStProperty.values()) {
Configuration config = new Configuration();
if (property.getConfigKey().contains("num-files-at-level")) {
Expand All @@ -39,17 +39,12 @@ public void testNativeMetricsConfigurable() {

ForStNativeMetricOptions options = ForStNativeMetricOptions.fromConfig(config);

Assert.assertTrue(
String.format(
"Failed to enable native metrics with property %s",
property.getConfigKey()),
options.isEnabled());

Assert.assertTrue(
String.format(
"Failed to enable native metric %s using config",
property.getConfigKey()),
options.getProperties().contains(property));
assertThat(options.isEnabled())
.as("Failed to enable native metrics with property %s", property.getConfigKey())
.isTrue();
assertThat(options.getProperties())
.as("Failed to enable native metric %s using config", property.getConfigKey())
.contains(property);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,41 @@

package org.apache.flink.state.forst;

import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.OperatingSystem;

import org.forstdb.ColumnFamilyOptions;
import org.forstdb.DBOptions;
import org.forstdb.NativeLibraryLoader;
import org.forstdb.RocksDB;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

/** Tests for the {@link ForStOperationUtils}. */
public class ForStOperationsUtilsTest {
class ForStOperationsUtilsTest {

@ClassRule public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
@TempDir static java.nio.file.Path tmpDir;

@BeforeClass
public static void loadRocksLibrary() throws Exception {
NativeLibraryLoader.getInstance().loadLibrary(TMP_DIR.newFolder().getAbsolutePath());
@BeforeAll
static void loadRocksLibrary() throws Exception {
NativeLibraryLoader.getInstance()
.loadLibrary(TempDirUtils.newFolder(tmpDir).getAbsolutePath());
}

@Test
public void testPathExceptionOnWindows() throws Exception {
void testPathExceptionOnWindows() throws Exception {
assumeTrue(OperatingSystem.isWindows());

final File folder = TMP_DIR.newFolder();
final File folder = TempDirUtils.newFolder(tmpDir);
final File rocksDir =
new File(folder, getLongString(247 - folder.getAbsolutePath().length()));

Expand All @@ -73,9 +73,8 @@ public void testPathExceptionOnWindows() throws Exception {
// do not provoke a test failure if this passes, because some setups may actually
// support long paths, in which case: great!
} catch (IOException e) {
assertThat(
e.getMessage(),
containsString("longer than the directory path length limit for Windows"));
assertThat(e.getMessage())
.contains("longer than the directory path length limit for Windows");
}
}

Expand Down
Loading