Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions conf/cassandra_latest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2673,3 +2673,21 @@ compression_dictionary_cache_size: 10
# Expired dictionaries will be removed from memory but can be reloaded if needed.
# Min unit: s
compression_dictionary_cache_expire: 24h

# Enables configuring plugin compression providers.
# When enabled, Cassandra will attempt to load the provider with the specified class_name,
# if jar file is available in the classpath.
# Parameters:
# - fallback_to_default_provider: If "true", falls back to DefaultCompressionProvider
# when the plugin fails to load. If "false", throws an exception on failure.
# If this section is commented out, Cassandra uses DefaultCompressionProvider
# with built-in compressor implementation.
#compression_provider_options:
# LZ4Compressor:
# class_name: org.apache.cassandra.io.compress.DefaultCompressionProvider
# parameters:
# fallback_to_default_provider: "true"
# ZstdCompressor:
# class_name: org.apache.cassandra.io.compress.DefaultCompressionProvider
# parameters:

4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.slf4j.Logger;
Expand Down Expand Up @@ -89,6 +91,7 @@ public static Set<String> splitCommaDelimited(String src)
public ParameterizedClass crypto_provider;
public ParameterizedClass network_authorizer;
public ParameterizedClass cidr_authorizer;
public final ConcurrentMap<String, ParameterizedClass> compression_provider_options = Maps.newConcurrentMap();

@Replaces(oldName = "permissions_validity_in_ms", converter = Converters.MILLIS_DURATION_INT, deprecated = true)
public volatile DurationSpec.IntMillisecondsBound permissions_validity = new DurationSpec.IntMillisecondsBound("2s");
Expand Down Expand Up @@ -1552,4 +1555,5 @@ public enum CQLStartTime
* 6.0 and later.
*/
public volatile boolean gossip_quarantine_disabled = false;

}
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressorRegistry;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.util.DiskOptimizationStrategy;
Expand Down Expand Up @@ -371,6 +372,8 @@ public static void toolInitialization(boolean failIfDaemonOrClient)

applyCompatibilityMode();

applyCompressionProvider();

applySSTableFormats();

applySimpleConfig();
Expand Down Expand Up @@ -437,6 +440,7 @@ public static void clientInitialization(boolean failIfDaemonOrTool, Supplier<Con
applyCompatibilityMode();
diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
applySSTableFormats();
applyCompressionProvider();
}

private static void assertNotDaemonInitialized()
Expand Down Expand Up @@ -550,6 +554,8 @@ private static void applyAll() throws ConfigurationException

applySSTableFormats();

applyCompressionProvider();

applyCryptoProvider();

applySimpleConfig();
Expand Down Expand Up @@ -579,6 +585,11 @@ private static void applyAll() throws ConfigurationException
applyStartupChecks();
}

public static void applyCompressionProvider()
{
CompressorRegistry.instance.registerServices(conf.compression_provider_options);
}

private static void applySimpleConfig()
{
//Doing this first before all other things in case other pieces of config want to construct
Expand Down Expand Up @@ -6184,4 +6195,5 @@ public static void setGossipQuarantineDisabled(boolean disabled)
{
conf.gossip_quarantine_disabled = disabled;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io.compress;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstract base class for compression providers.
* Provides common functionality for loading and managing compressors
* with configurable fallback behavior for plugin compression libraries.
*/
public abstract class AbstractCompressionProvider
{
protected static final Logger logger = LoggerFactory.getLogger(AbstractCompressionProvider.class);

public AbstractCompressionProvider()
{
}

/**
* Returns the fully qualified class name of the compression provider.
*
* @return Fully qualified name of the provider
*/
public abstract String getProviderName();

/**
* Returns the simple class name of the compression provider.
*
* @return Class name of the provider
*/
public abstract String getProviderSimpleName();

/**
* Checks if this compression provider is in a healthy state and ready to use.
*
* <p>This method should perform any necessary health checks, such as verifying
* that required native libraries are loaded, able to do compress/decompress successfully, etc.</p>
*
* @return true if the provider is healthy and ready to use, false otherwise
* @throws Exception if an error occurs during the health check
*/
public abstract boolean isHealthy() throws Exception;

/**
* Creates a new compressor instance with the given compression parameters.
*
* <p>This method is called when a new compressor is needed. The implementation
* should create and configure a compressor based on the provided options.</p>
*
* @param options Configuration options for the compressor. May be null or empty.
* @return A new ICompressor instance
* @throws IllegalStateException if the compressor cannot be created
*/
public abstract ICompressor createCompressor(Class<?> compressorClass, Map<String, String> options) throws IllegalStateException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class CompressionMetadata extends WrappedSharedCloseable
@Nullable // null when no dictionary
private final CompressionDictionary compressionDictionary;
private volatile ICompressor resolvedCompressor;
private static final CompressorRegistry registry = CompressorRegistry.instance;

@VisibleForTesting
public static CompressionMetadata open(File chunksIndexFile,
Expand Down Expand Up @@ -497,7 +498,7 @@ private void writeHeader(DataOutput out, long dataLength, int chunks)
{
try
{
out.writeUTF(parameters.getSstableCompressor().getClass().getSimpleName());
out.writeUTF(registry.getCompressorTypeSimpleName(parameters.getSstableCompressor().getClass().getSimpleName()));
out.writeInt(parameters.getOtherOptions().size());
for (Map.Entry<String, String> entry : parameters.getOtherOptions().entrySet())
{
Expand Down
Loading