Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ The Apache Software License, Version 2.0
- opentelemetry-api-incubator-1.56.0-alpha.jar
- opentelemetry-common-1.56.0.jar
- opentelemetry-context-1.56.0.jar
* Slog
- slog-0.9.5.jar

* BookKeeper
- bookkeeper-common-allocator-4.17.3.jar
Expand Down
2 changes: 1 addition & 1 deletion pulsar-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ dependencies {
api(project(":pulsar-client-api"))
api(project(":pulsar-client-admin-api"))

implementation(libs.slf4j.api)
implementation(libs.slog)
implementation(libs.jackson.databind)
implementation(libs.jackson.module.parameter.names)
implementation(libs.jackson.datatype.jsr310)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.EncodeData;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -35,7 +35,7 @@
/**
* Util class for processing key/value schema info.
*/
@Slf4j
@CustomLog
public final class KeyValueSchemaInfo {

private static final Schema<SchemaInfo> SCHEMA_INFO_WRITER = new Schema<SchemaInfo>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.CustomLog;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
Expand All @@ -39,8 +39,8 @@
/**
* Holder of a ByteBuf allocator.
*/
@CustomLog
@UtilityClass
@Slf4j
public class PulsarByteBufAllocator {

public static final String PULSAR_ALLOCATOR_POOLED = "pulsar.allocator.pooled";
Expand Down Expand Up @@ -76,9 +76,7 @@ static ByteBufAllocator createByteBufAllocator() {
System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, "FallbackToHeap"));

final LeakDetectionPolicy leakDetectionPolicy = resolveLeakDetectionPolicyWithHighestLevel(System::getProperty);
if (log.isDebugEnabled()) {
log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled, isExitOnOutOfMemory);
}
log.debug().attr("isPooled", isPooled).attr("exitOnOOM", isExitOnOutOfMemory).log("Allocator configuration");

ByteBufAllocatorBuilder builder = ByteBufAllocatorBuilder.create()
.leakDetectionPolicy(leakDetectionPolicy)
Expand All @@ -89,12 +87,12 @@ static ByteBufAllocator createByteBufAllocator() {
try {
c.accept(oomException);
} catch (Throwable t) {
log.warn("Exception during OOM listener: {}", t.getMessage(), t);
log.warn().exception(t).log("Exception during OOM listener");
}
});

if (isExitOnOutOfMemory) {
log.info("Exiting JVM process for OOM error: {}", oomException.getMessage(), oomException);
log.info().exception(oomException).log("Exiting JVM process for OOM error");
ShutdownUtil.triggerImmediateForcefulShutdown();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import lombok.CustomLog;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
Expand All @@ -37,8 +37,8 @@
/**
* Helper class to work with a raw Pulsar entry payload.
*/
@CustomLog
@UtilityClass
@Slf4j
public class MessageParser {

private static final FastThreadLocal<SingleMessageMetadata> LOCAL_SINGLE_MESSAGE_METADATA = //
Expand Down Expand Up @@ -83,8 +83,11 @@ public static void parseMessage(String topicName, long ledgerId, long entryId, B
try {
Commands.parseMessageMetadata(payload, msgMetadata);
} catch (Throwable t) {
log.warn("[{}] Failed to deserialize metadata for message {}:{} - Ignoring",
topicName, ledgerId, entryId);
log.warn()
.attr("topic", topicName)
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.log("Failed to deserialize metadata for message - Ignoring");
return;
}

Expand Down Expand Up @@ -128,9 +131,13 @@ public static boolean verifyChecksum(String topic, ByteBuf headersAndPayload, lo
int checksum = readChecksum(headersAndPayload);
int computedChecksum = computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
log.error(
"[{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
topic, ledgerId, entryId, Long.toHexString(checksum), Integer.toHexString(computedChecksum));
log.error()
.attr("topic", topic)
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.attr("receivedChecksum", "0x" + Long.toHexString(checksum))
.attr("computedChecksum", "0x" + Integer.toHexString(computedChecksum))
.log("Checksum mismatch for message");
return false;
}
}
Expand All @@ -152,17 +159,26 @@ public static ByteBuf uncompressPayloadIfNeeded(String topic, MessageMetadata ms
int payloadSize = payload.readableBytes();
if (payloadSize > maxMessageSize) {
// payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
log.error("[{}] Got corrupted payload message size {} at {}:{}", topic, payloadSize,
ledgerId, entryId);
log.error()
.attr("topic", topic)
.attr("payloadSize", payloadSize)
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.log("Got corrupted payload message size");
return null;
}

try {
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
return uncompressedPayload;
} catch (IOException e) {
log.error("[{}] Failed to decompress message with {} at {}:{} : {}", topic,
msgMetadata.getCompression(), ledgerId, entryId, e.getMessage(), e);
log.error()
.attr("topic", topic)
.attr("compression", msgMetadata.getCompression())
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.exception(e)
.log("Failed to decompress message");
return null;
}
}
Expand All @@ -187,7 +203,7 @@ private static void receiveIndividualMessagesFromBatch(ReferenceCountedMessageMe
ledgerId, entryId, i));
}
} catch (IOException e) {
log.warn("Unable to obtain messages in batch", e);
log.warn().exception(e).log("Unable to obtain messages in batch");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,15 @@
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import lombok.CustomLog;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A tool class for loading BrokerEntryMetadataInterceptor classes.
*/
@CustomLog
public class BrokerEntryMetadataUtils<T> {

private static final Logger log = LoggerFactory.getLogger(BrokerEntryMetadataUtils.class);

public static Set<BrokerEntryMetadataInterceptor> loadBrokerEntryMetadataInterceptors(
Set<String> interceptorNames, ClassLoader classLoader) {
Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>();
Expand All @@ -46,12 +44,17 @@ public static Set<BrokerEntryMetadataInterceptor> loadBrokerEntryMetadataInterce
interceptors.add(clz.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException
| InvocationTargetException | NoSuchMethodException e) {
log.error("Create new BrokerEntryMetadataInterceptor instance for {} failed.",
interceptorName, e);
log.error()
.attr("interceptorName", interceptorName)
.exception(e)
.log("Create new BrokerEntryMetadataInterceptor instance failed.");
throw new RuntimeException(e);
}
} catch (ClassNotFoundException e) {
log.error("Load BrokerEntryMetadataInterceptor class for {} failed.", interceptorName, e);
log.error()
.attr("interceptorName", interceptorName)
.exception(e)
.log("Load BrokerEntryMetadataInterceptor class failed.");
throw new RuntimeException(e);
}
}
Expand All @@ -71,12 +74,17 @@ public static <T> Set<T> loadInterceptors(
interceptors.add(clz.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException
| InvocationTargetException | NoSuchMethodException e) {
log.error("Create new instance for {} failed. Exception is {}",
interceptorName, e);
log.error()
.attr("interceptorName", interceptorName)
.exception(e)
.log("Create new instance failed.");
throw new RuntimeException(e);
}
} catch (ClassNotFoundException e) {
log.error("Load class for {} failed. Exception is {}", interceptorName, e);
log.error()
.attr("interceptorName", interceptorName)
.exception(e)
.log("Load class failed.");
throw new RuntimeException(e);
}
}
Expand Down
Loading
Loading