From e0775f9e601fa3afcdf9a791012538ea7097f44a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 8 Jun 2026 19:16:25 +0800 Subject: [PATCH 1/2] Fix short reads in fixed-length deserialization --- .../datastructure/SerializableList.java | 3 +- .../calc/utils/sort/FileSpillerReader.java | 4 +- .../externalservice/ExternalServiceInfo.java | 3 +- .../confignode/persistence/ProcedureInfo.java | 13 +- .../writelog/io/SingleFileLogReader.java | 5 +- .../deletion/recover/DeletionReader.java | 5 +- .../planner/plan/node/write/ObjectNode.java | 4 +- .../logfile/FakeCRC32Deserializer.java | 5 +- .../impl/pbtree/schemafile/SchemaFile.java | 3 +- .../schemafile/log/SchemaFileLogReader.java | 7 +- .../schemafile/pagemgr/PageIOChannel.java | 3 +- .../schemaregion/tag/TagLogFile.java | 9 +- .../dataregion/modification/IDPredicate.java | 7 +- .../dataregion/wal/io/WALFileVersion.java | 4 +- .../dataregion/wal/io/WALInputStream.java | 32 ++-- .../dataregion/wal/io/WALMetaData.java | 12 +- .../wal/recover/WALRepairWriter.java | 3 +- .../load/splitter/AlignedChunkData.java | 6 +- .../SubscriptionEventTsFileResponse.java | 11 +- .../db/utils/DataNodeObjectFileService.java | 3 +- .../logfile/FakeCRC32DeserializerTest.java | 108 +++++++++++++ .../modification/TableDeletionEntryTest.java | 9 ++ .../impl/TimeWindowStateProgressIndex.java | 9 +- .../commons/executable/ExecutableManager.java | 3 +- .../serializer/PlainQueueSerializer.java | 3 +- .../pipe/sink/protocol/IoTDBAirGapSink.java | 5 +- .../table/column/TsTableColumnCategory.java | 7 +- .../apache/iotdb/commons/utils/IOUtils.java | 24 ++- .../iotdb/commons/utils/IOUtilsTest.java | 147 ++++++++++++++++++ 29 files changed, 384 insertions(+), 73 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java index ee70df305ed96..fb1b86ad8bb92 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java @@ -22,6 +22,7 @@ import org.apache.iotdb.calc.service.AbstractTemporaryQueryDataFileService; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.tsfile.utils.PublicBAOS; @@ -60,7 +61,7 @@ default void deserialize() throws IOException { } init(); ByteBuffer byteBuffer = ByteBuffer.allocate(recorder.getSerializedByteLength()); - recorder.getFileChannel().read(byteBuffer); + IOUtils.readFully(recorder.getFileChannel(), byteBuffer); byteBuffer.flip(); deserialize(byteBuffer); recorder.closeFile(); diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java index af7d726d5fff3..05fc939ede7ec 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java @@ -21,6 +21,7 @@ import org.apache.iotdb.calc.utils.datastructure.MergeSortKey; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.common.conf.TSFileDescriptor; @@ -93,10 +94,11 @@ private long read() throws IoTDBException { if (readLen == -1) { return -1; } + IOUtils.readFully(fileChannel, bytes); bytes.flip(); int capacity = bytes.getInt(); ByteBuffer tsBlockBytes = ByteBuffer.allocate(capacity); - fileChannel.read(tsBlockBytes); + IOUtils.readFully(fileChannel, tsBlockBytes); tsBlockBytes.flip(); TsBlock cachedTsBlock = serde.deserialize(tsBlockBytes); cacheBlocks.add(cachedTsBlock); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java index a3cf89515b5ef..823325514a92f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; @@ -239,7 +240,7 @@ private ServiceInfo deserializeServiceInfoConsiderCRC(InputStream inputStream) throws IOException { int length = ReadWriteIOUtils.readInt(inputStream); byte[] bytes = new byte[length]; - inputStream.read(bytes); + new DataInputStream(inputStream).readFully(bytes); crc32.reset(); crc32.update(bytes, 0, length); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java index 63855d9646d30..f95575fef2967 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; @@ -165,8 +166,16 @@ private static Optional loadProcedure(Path procedureFilePath) { try (FileInputStream fis = new FileInputStream(procedureFilePath.toFile())) { Procedure procedure = null; try (FileChannel channel = fis.getChannel()) { - ByteBuffer byteBuffer = ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE); - if (channel.read(byteBuffer) > 0) { + final long fileSize = channel.size(); + if (fileSize > PROCEDURE_LOAD_BUFFER_SIZE) { + throw new IOException( + String.format( + "Procedure file %s exceeds the load buffer limit %s, actual size %s", + procedureFilePath, PROCEDURE_LOAD_BUFFER_SIZE, fileSize)); + } + ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileSize); + if (fileSize > 0) { + IOUtils.readFully(channel, byteBuffer); byteBuffer.flip(); procedure = ProcedureFactory.getInstance().create(byteBuffer); byteBuffer.clear(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java index b46057b8330d7..fe123372bec18 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java @@ -76,10 +76,7 @@ public boolean hasNext() { } buffer = new byte[logSize]; - int readLen = logStream.read(buffer, 0, logSize); - if (readLen < logSize) { - throw new IOException(ConfigNodeMessages.REACH_EOF); - } + logStream.readFully(buffer, 0, logSize); final long checkSum = logStream.readLong(); checkSummer.reset(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java index 0bd6f48d51f79..8477e1e805934 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.consensus.deletion.recover; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; @@ -60,7 +61,7 @@ public List readAllDeletions() throws IOException { try { // Read magic string ByteBuffer magicStringBuffer = ByteBuffer.allocate(MAGIC_STRING_BYTES_SIZE); - fileChannel.read(magicStringBuffer); + IOUtils.readFully(fileChannel, magicStringBuffer); magicStringBuffer.flip(); String magicVersion = new String(magicStringBuffer.array(), StandardCharsets.UTF_8); if (LOGGER.isDebugEnabled()) { @@ -70,7 +71,7 @@ public List readAllDeletions() throws IOException { // Read deletions long remainingBytes = fileChannel.size() - fileChannel.position(); ByteBuffer byteBuffer = ByteBuffer.allocate((int) remainingBytes); - fileChannel.read(byteBuffer); + IOUtils.readFully(fileChannel, byteBuffer); byteBuffer.flip(); List deletions = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index 552d26e56319a..cf80d63a8d56f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -158,7 +158,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { if (objectFile.isPresent()) { try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(), "r")) { raf.seek(offset); - raf.read(contents); + raf.readFully(contents); } catch (IOException e) { throw new RuntimeException(e); } @@ -308,7 +308,7 @@ public ByteBuffer serialize() { private void readContentFromFile(File file, byte[] contents) throws IOException { try (RandomAccessFile raf = new RandomAccessFile(file, "r")) { raf.seek(offset); - raf.read(contents); + raf.readFully(contents); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java index 369b0ecfbf8b2..73584b3b13c0e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java @@ -29,7 +29,6 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -69,9 +68,7 @@ public T deserialize(InputStream inputStream) throws IOException { } byte[] logBuffer = new byte[logLength]; - if (logLength < inputStream.read(logBuffer, 0, logLength)) { - throw new EOFException(); - } + dataInputStream.readFully(logBuffer, 0, logLength); T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java index f9f5a63a44ae7..665bafb956f78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode; import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; @@ -360,7 +361,7 @@ private void initFileHeader() throws IOException, MetadataException { lastSGAddr = 0L; pageManager = new BTreePageManager(channel, pmtFile, -1, logPath); } else { - channel.read(headerContent); + IOUtils.readFully(channel, headerContent); headerContent.clear(); lastPageIndex = ReadWriteIOUtils.readInt(headerContent); dataTTL = ReadWriteIOUtils.readLong(headerContent); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java index 1840d867b36ed..d2300384fa7ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java @@ -27,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -91,8 +93,9 @@ public List collectUpdatedEntries() throws IOException, SchemaFileLogCor } } - // corrupted within one entry - if (inputStream.read(tempBytes, 1, tempBytes.length - 1) < tempBytes.length - 2) { + try { + new DataInputStream(inputStream).readFully(tempBytes, 1, tempBytes.length - 1); + } catch (EOFException e) { throw new SchemaFileLogCorruptedException( logFile.getAbsolutePath(), DataNodeSchemaMessages.SCHEMA_FILE_LOG_INCOMPLETE_ENTRY); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java index 044b7339a82e8..4e9d6524f1996 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader; @@ -107,7 +108,7 @@ public void loadFromFileToBuffer(ByteBuffer dst, int pageIndex) throws IOExcepti if (!readChannel.isOpen()) { readChannel = FileChannel.open(pmtFile.toPath(), StandardOpenOption.READ); } - readChannel.read(dst, getPageAddress(pageIndex)); + IOUtils.readFully(readChannel, dst, getPageAddress(pageIndex)); } // region Flush Strategy diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java index 6ffed68535af0..62453e24e85c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodeSchemaMessages; @@ -116,7 +117,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position) throws IOException { // Read the first block ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); - fileChannel.read(byteBuffer, position); + IOUtils.readFully(fileChannel, byteBuffer, position); byteBuffer.flip(); if (byteBuffer.limit() > 0) { // This indicates that there is data at this position int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int @@ -131,7 +132,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position) // read one offset, then use filechannel's read to read it byteBuffers.position(MAX_LENGTH * i); byteBuffers.limit(MAX_LENGTH * (i + 1)); - fileChannel.read(byteBuffers, nextPosition); + IOUtils.readFully(fileChannel, byteBuffers, nextPosition); byteBuffers.position(4 + i * Long.BYTES); } byteBuffers.limit(byteBuffers.capacity()); @@ -146,7 +147,7 @@ private List parseOffsetList(long position) throws IOException { blockOffset.add(position); // Read the first block ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); - fileChannel.read(byteBuffer, position); + IOUtils.readFully(fileChannel, byteBuffer, position); byteBuffer.flip(); if (byteBuffer.limit() > 0) { // This indicates that there is data at this position int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int @@ -169,7 +170,7 @@ private List parseOffsetList(long position) throws IOException { // read blockBuffer.position(MAX_LENGTH * i); blockBuffer.limit(MAX_LENGTH * (i + 1)); - fileChannel.read(blockBuffer, blockOffset.get(i)); + IOUtils.readFully(fileChannel, blockBuffer, blockOffset.get(i)); blockBuffer.position(4 + i * Long.BYTES); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java index d1f19df10a737..d9a7fa812a24f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java @@ -30,6 +30,7 @@ import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -64,7 +65,11 @@ public long serialize(ByteBuffer buffer) { } public static IDPredicateType deserialize(InputStream stream) throws IOException { - return values()[stream.read()]; + int typeNum = stream.read(); + if (typeNum == -1) { + throw new EOFException(); + } + return values()[typeNum]; } public static IDPredicateType deserialize(ByteBuffer buffer) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java index e3d374551b115..8a6d2fbf890c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.commons.utils.IOUtils; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -63,7 +65,7 @@ public static WALFileVersion getVersion(FileChannel channel) throws IOException continue; } ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length); - channel.read(buffer); + IOUtils.readFully(channel, buffer); buffer.flip(); String versionString = new String(buffer.array(), StandardCharsets.UTF_8); if (version.versionString.equals(versionString)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index d5db22f0aff7e..272a1fa62e08f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.i18n.StorageEngineMessages; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.utils.MmapUtil; @@ -86,7 +87,8 @@ private void getEndOffset() throws IOException { if (version == WALFileVersion.V2) { // New Version ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); - channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); + IOUtils.readFully( + channel, magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX) || !new String(magicStringBuffer.array(), StandardCharsets.UTF_8) @@ -106,7 +108,8 @@ private void getEndOffset() throws IOException { } // Old version ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); - channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); + IOUtils.readFully( + channel, magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8) .equals(version.getVersionString())) { @@ -118,7 +121,7 @@ private void getEndOffset() throws IOException { } } // Read the metadata size - channel.read(metadataSizeBuf, position); + IOUtils.readFully(channel, metadataSizeBuf, position); metadataSizeBuf.flip(); int metadataSize = metadataSizeBuf.getInt(); endOffset = channel.size() - version.getVersionBytes().length - Integer.BYTES - metadataSize; @@ -240,9 +243,7 @@ private void loadNextSegmentV2() throws IOException { compressedBuffer.clear(); // limit the buffer to prevent it from reading too much byte than expected compressedBuffer.limit(segmentInfo.dataInDiskSize); - if (readWALBufferFromChannel(compressedBuffer) != segmentInfo.dataInDiskSize) { - throw new IOException(StorageEngineMessages.UNEXPECTED_END_OF_FILE); - } + readWALBufferFullyFromChannel(compressedBuffer); compressedBuffer.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor); @@ -258,9 +259,7 @@ private void loadNextSegmentV2() throws IOException { // limit the buffer to prevent it from reading too much byte than expected dataBuffer.limit(segmentInfo.dataInDiskSize); - if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) { - throw new IOException(StorageEngineMessages.UNEXPECTED_END_OF_FILE); - } + readWALBufferFullyFromChannel(dataBuffer); } } catch (Exception e) { logger.error( @@ -313,7 +312,7 @@ public void skipToGivenLogicalPosition(long pos) throws IOException { if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) { compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - readWALBufferFromChannel(compressedBuffer); + readWALBufferFullyFromChannel(compressedBuffer); compressedBuffer.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); @@ -322,7 +321,7 @@ public void skipToGivenLogicalPosition(long pos) throws IOException { compressedBuffer = null; } else { dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - readWALBufferFromChannel(dataBuffer); + readWALBufferFullyFromChannel(dataBuffer); dataBuffer.flip(); } @@ -361,7 +360,7 @@ public WALMetaData getWALMetaData() throws IOException { private SegmentInfo getNextSegmentInfo() throws IOException { segmentHeaderWithoutCompressedSizeBuffer.clear(); - channel.read(segmentHeaderWithoutCompressedSizeBuffer); + readWALBufferFullyFromChannel(segmentHeaderWithoutCompressedSizeBuffer); segmentHeaderWithoutCompressedSizeBuffer.flip(); SegmentInfo info = new SegmentInfo(); info.compressionType = @@ -369,7 +368,7 @@ private SegmentInfo getNextSegmentInfo() throws IOException { info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt(); if (info.compressionType != CompressionType.UNCOMPRESSED) { compressedSizeBuffer.clear(); - readWALBufferFromChannel(compressedSizeBuffer); + readWALBufferFullyFromChannel(compressedSizeBuffer); compressedSizeBuffer.flip(); info.uncompressedSize = compressedSizeBuffer.getInt(); } else { @@ -385,6 +384,13 @@ private int readWALBufferFromChannel(ByteBuffer buffer) throws IOException { return size; } + private void readWALBufferFullyFromChannel(ByteBuffer buffer) throws IOException { + long startTime = System.nanoTime(); + int size = buffer.remaining(); + IOUtils.readFully(channel, buffer); + WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() - startTime); + } + private void uncompressWALBuffer( ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor unCompressor) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java index 83025ed6b82cc..4223bdd7f2e2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.consensus.iot.log.ConsensusReqReader; import org.apache.iotdb.db.i18n.StorageEngineMessages; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException; @@ -144,12 +145,12 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES); WALFileVersion version = WALFileVersion.getVersion(channel); position = channel.size() - Integer.BYTES - (version.getVersionBytes().length); - channel.read(metadataSizeBuf, position); + IOUtils.readFully(channel, metadataSizeBuf, position); metadataSizeBuf.flip(); // load metadata int metadataSize = metadataSizeBuf.getInt(); ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize); - channel.read(metadataBuf, position - metadataSize); + IOUtils.readFully(channel, metadataBuf, position - metadataSize); metadataBuf.flip(); metaData = WALMetaData.deserialize(metadataBuf); // versions before V1.3, should recover memTable ids from entries @@ -158,8 +159,8 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr for (int size : metaData.buffersSize) { channel.position(offset); ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); - channel.read(buffer); - buffer.clear(); + IOUtils.readFully(channel, buffer); + buffer.flip(); metaData.memTablesId.add(buffer.getLong()); offset += size; } @@ -176,7 +177,8 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr private static boolean isValidMagicString(FileChannel channel) throws IOException { ByteBuffer magicStringBytes = ByteBuffer.allocate(WALFileVersion.V2.getVersionBytes().length); - channel.read(magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length); + IOUtils.readFully( + channel, magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length); magicStringBytes.flip(); String magicString = new String(magicStringBytes.array(), StandardCharsets.UTF_8); return magicString.equals(WALFileVersion.V2.getVersionString()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java index d1b27060c90b6..46598561a5bb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.recover; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter; @@ -65,7 +66,7 @@ private String readTailMagic(WALFileVersion version) throws IOException { } try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) { ByteBuffer magicStringBytes = ByteBuffer.allocate(size); - channel.read(magicStringBytes, channel.size() - size); + IOUtils.readFully(channel, magicStringBytes, channel.size() - size); magicStringBytes.flip(); return new String(magicStringBytes.array(), StandardCharsets.UTF_8); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java index 2c9cea068529b..3093008356d64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java @@ -21,7 +21,6 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.TimePartitionUtils; -import org.apache.iotdb.db.i18n.StorageEngineMessages; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.tsfile.enums.TSDataType; @@ -46,6 +45,7 @@ import javax.annotation.Nonnull; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; @@ -304,9 +304,7 @@ protected void deserializeTsFileDataByte(final InputStream stream) throws IOExce this.chunkData = ((LoadTsFilePieceNode.ByteBufferInputStream) stream).read(size); } else { byte[] data = new byte[size]; - if (size != stream.read(data)) { - throw new IOException(StorageEngineMessages.TSFILE_DATA_BYTE_ARRAY_SIZE_MISMATCH); - } + new DataInputStream(stream).readFully(data); this.chunkData = ByteBuffer.wrap(data); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java index ee8ef1fdb8952..90fbd117905a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java @@ -194,20 +194,13 @@ private CachedSubscriptionPollResponse generateResponseWithPieceOrSealPayload( PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(bufferSize); final byte[] readBuffer = new byte[(int) bufferSize]; - final int readLength = reader.read(readBuffer); - if (readLength != bufferSize) { - memoryBlock.close(); - throw new SubscriptionException( - String.format( - "inconsistent read length (broken invariant), expected: %s, actual: %s", - bufferSize, readLength)); - } + reader.readFully(readBuffer); // generate subscription poll response with piece payload final CachedSubscriptionPollResponse response = new CachedSubscriptionPollResponse( SubscriptionPollResponseType.FILE_PIECE.getType(), - new FilePiecePayload(tsFile.getName(), writingOffset + readLength, readBuffer), + new FilePiecePayload(tsFile.getName(), writingOffset + bufferSize, readBuffer), commitContext); // set fixed memory block for response diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java index 8de64f9de310b..2036885c38512 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java @@ -23,6 +23,7 @@ import org.apache.iotdb.calc.utils.ObjectTypeUtils; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.ObjectFileNotExist; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.i18n.DataNodeMiscMessages; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; @@ -101,7 +102,7 @@ private static ByteBuffer readObjectContentFromLocalFile(File file, long offset, byte[] bytes = new byte[(int) readSize]; ByteBuffer buffer = ByteBuffer.wrap(bytes); try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { - fileChannel.read(buffer, offset); + IOUtils.readFully(fileChannel, buffer, offset); } catch (IOException e) { throw new IoTDBRuntimeException(e, TSStatusCode.OBJECT_READ_ERROR.getStatusCode()); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java new file mode 100644 index 0000000000000..b9f583119778e --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.schemaengine.schemaregion.logfile; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class FakeCRC32DeserializerTest { + + @Test + public void deserializeReadsCompletePayloadAfterShortRead() throws IOException { + byte[] payload = new byte[] {1, 2, 3, 4}; + + byte[] deserialized = + new FakeCRC32Deserializer<>(new ByteBufferDeserializer()) + .deserialize(new OneByteAtATimeInputStream(serialize(payload, true))); + + Assert.assertArrayEquals(payload, deserialized); + } + + @Test + public void deserializeThrowsWhenPayloadIsTruncated() throws IOException { + byte[] bytes = serialize(new byte[] {1, 2}, false, false); + + Assert.assertThrows( + EOFException.class, + () -> + new FakeCRC32Deserializer<>(new ByteBufferDeserializer()) + .deserialize(new OneByteAtATimeInputStream(bytes))); + } + + private static byte[] serialize(byte[] payload, boolean complete) throws IOException { + return serialize(payload, complete, true); + } + + private static byte[] serialize(byte[] payload, boolean complete, boolean writeValidationCode) + throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { + dataOutputStream.writeInt(complete ? payload.length : payload.length + 1); + dataOutputStream.write(payload); + if (writeValidationCode) { + dataOutputStream.writeLong(0L); + } + } + return outputStream.toByteArray(); + } + + private static class ByteBufferDeserializer implements IDeserializer { + + @Override + public byte[] deserialize(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + } + + private static class OneByteAtATimeInputStream extends InputStream { + + private final byte[] bytes; + private int index; + + private OneByteAtATimeInputStream(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public int read() { + return index < bytes.length ? bytes[index++] & 0xFF : -1; + } + + @Override + public int read(byte[] b, int off, int len) { + if (len == 0) { + return 0; + } + if (index >= bytes.length) { + return -1; + } + b[off] = bytes[index++]; + return 1; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java index 5c2979a90755c..f4a1acb799863 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java @@ -30,12 +30,14 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class TableDeletionEntryTest { @@ -57,6 +59,13 @@ public void testSerialization() throws IOException { assertEquals(entry, deserialized2); } + @Test + public void testDeserializePredicateTypeFromEmptyStream() { + assertThrows( + EOFException.class, + () -> IDPredicate.IDPredicateType.deserialize(new ByteArrayInputStream(new byte[0]))); + } + @Test public void testAffectDevice() { TableDeletionEntry entry1 = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java index bb1b2c1ce5e18..02139d2058d83 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java @@ -29,6 +29,7 @@ import javax.annotation.Nonnull; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -279,13 +280,7 @@ public static TimeWindowStateProgressIndex deserializeFrom(InputStream stream) continue; } final byte[] body = new byte[length]; - final int readLen = stream.read(body); - if (readLen != length) { - throw new IOException( - String.format( - "The intended read length is %s but %s is actually read when deserializing TimeProgressIndex, ProgressIndex: %s", - length, readLen, timeWindowStateProgressIndex)); - } + new DataInputStream(stream).readFully(body); final ByteBuffer dstBuffer = ByteBuffer.wrap(body); timeWindowStateProgressIndex.timeSeries2TimestampWindowBufferPairMap.put( timeSeries, new Pair<>(timestamp, dstBuffer)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java index 2c75928fee258..7d42bed3f8e35 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.trigger.exception.TriggerJarTooLargeException; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.tsfile.external.commons.io.FileUtils; import org.apache.tsfile.fileSystem.FSFactoryProducer; @@ -199,7 +200,7 @@ public static ByteBuffer transferToBytebuffer(String filePath) throws IOExceptio String.format("Size of file exceed %d bytes", Integer.MAX_VALUE)); } ByteBuffer byteBuffer = ByteBuffer.allocate((int) size); - fileChannel.read(byteBuffer); + IOUtils.readFully(fileChannel, byteBuffer); byteBuffer.flip(); return byteBuffer; } catch (Exception e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java index d944c83f77dd7..10087878d93ec 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.i18n.PipeMessages; import org.apache.iotdb.commons.pipe.datastructure.queue.ConcurrentIterableLinkedQueue; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -69,7 +70,7 @@ public void loadQueueFromFile( } int capacity = ReadWriteIOUtils.readInt(inputStream); ByteBuffer buffer = ByteBuffer.allocate(capacity); - channel.read(buffer); + IOUtils.readFully(channel, buffer); buffer.flip(); E element = elementDeserializationFunction.apply(buffer); if (element == null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java index 6d74102ff1940..4543ebaee9b5f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedOutputStream; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -324,8 +325,8 @@ protected boolean sendBytes(final AirGapSocket socket, byte[] bytes) throws IOEx outputStream.flush(); final byte[] response = new byte[1]; - final int size = socket.getInputStream().read(response); - return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response); + new DataInputStream(socket.getInputStream()).readFully(response); + return Arrays.equals(AirGapOneByteResponse.OK, response); } protected boolean send(final AirGapSocket socket, final byte[] bytes) throws IOException { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java index 9c6a5e3faee22..a104ba9d532d7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java @@ -24,6 +24,7 @@ import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -54,7 +55,11 @@ public void serialize(final ByteBuffer byteBuffer) { } public static TsTableColumnCategory deserialize(final InputStream stream) throws IOException { - return deserialize((byte) stream.read()); + final int category = stream.read(); + if (category < 0) { + throw new EOFException(); + } + return deserialize((byte) category); } public static TsTableColumnCategory deserialize(final ByteBuffer stream) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java index bb31459756597..88c929e709260 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java @@ -29,11 +29,13 @@ import com.google.common.util.concurrent.RateLimiter; import java.io.DataInputStream; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -125,6 +127,26 @@ public static void writeLong( outputStream.write(encodingBuffer.array(), 0, Long.BYTES); } + public static void readFully(FileChannel fileChannel, ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + if (fileChannel.read(buffer) <= 0) { + throw new EOFException(); + } + } + } + + public static void readFully(FileChannel fileChannel, ByteBuffer buffer, long position) + throws IOException { + long currentPosition = position; + while (buffer.hasRemaining()) { + final int readBytes = fileChannel.read(buffer, currentPosition); + if (readBytes <= 0) { + throw new EOFException(); + } + currentPosition += readBytes; + } + } + /** * Read a string from the given stream. * @@ -157,7 +179,7 @@ public static String readString( strBuffer = new byte[length]; } - inputStream.read(strBuffer, 0, length); + inputStream.readFully(strBuffer, 0, length); return new String(strBuffer, 0, length, encoding); } return null; diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java new file mode 100644 index 0000000000000..bf2cf546628a1 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.utils; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicInteger; + +public class IOUtilsTest { + + private static final String ENCODING = "UTF-8"; + + @Test + public void readStringReadsCompletePayloadAfterShortRead() throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + IOUtils.writeString(outputStream, "abcdefg", ENCODING, null); + + try (DataInputStream inputStream = + new DataInputStream(new OneByteAtATimeInputStream(outputStream.toByteArray()))) { + Assert.assertEquals("abcdefg", IOUtils.readString(inputStream, ENCODING, null)); + } + } + + @Test + public void readStringThrowsWhenPayloadIsTruncated() throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + IOUtils.writeInt(outputStream, 7, null); + outputStream.write(new byte[] {'a', 'b', 'c'}); + + try (DataInputStream inputStream = + new DataInputStream(new OneByteAtATimeInputStream(outputStream.toByteArray()))) { + Assert.assertThrows( + EOFException.class, () -> IOUtils.readString(inputStream, ENCODING, null)); + } + } + + @Test + public void readFullyReadsCompleteByteBufferAfterShortChannelRead() throws IOException { + byte[] bytes = new byte[] {1, 2, 3}; + FileChannel channel = mockOneByteAtATimeChannel(bytes); + ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + IOUtils.readFully(channel, buffer); + + Assert.assertArrayEquals(bytes, buffer.array()); + } + + @Test + public void readFullyReadsCompleteByteBufferFromPositionAfterShortChannelRead() + throws IOException { + byte[] bytes = new byte[] {1, 2, 3, 4, 5}; + FileChannel channel = mockOneByteAtATimeChannel(bytes); + ByteBuffer buffer = ByteBuffer.allocate(3); + + IOUtils.readFully(channel, buffer, 2); + + Assert.assertArrayEquals(new byte[] {3, 4, 5}, buffer.array()); + } + + @Test + public void readFullyThrowsWhenChannelIsTruncated() throws IOException { + FileChannel channel = mockOneByteAtATimeChannel(new byte[] {1, 2}); + ByteBuffer buffer = ByteBuffer.allocate(3); + + Assert.assertThrows(EOFException.class, () -> IOUtils.readFully(channel, buffer)); + } + + private static FileChannel mockOneByteAtATimeChannel(byte[] bytes) throws IOException { + FileChannel channel = Mockito.mock(FileChannel.class); + AtomicInteger index = new AtomicInteger(); + Mockito.when(channel.read(Mockito.any(ByteBuffer.class))) + .thenAnswer( + invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + int currentIndex = index.getAndIncrement(); + if (currentIndex >= bytes.length) { + return -1; + } + buffer.put(bytes[currentIndex]); + return 1; + }); + Mockito.when(channel.read(Mockito.any(ByteBuffer.class), Mockito.anyLong())) + .thenAnswer( + invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + long position = invocation.getArgument(1); + if (position >= bytes.length) { + return -1; + } + buffer.put(bytes[(int) position]); + return 1; + }); + return channel; + } + + private static class OneByteAtATimeInputStream extends InputStream { + + private final byte[] bytes; + private int index; + + private OneByteAtATimeInputStream(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public int read() { + return index < bytes.length ? bytes[index++] & 0xFF : -1; + } + + @Override + public int read(byte[] b, int off, int len) { + if (len == 0) { + return 0; + } + if (index >= bytes.length) { + return -1; + } + b[off] = bytes[index++]; + return 1; + } + } +} From a5e8562101f04c8f7e76292ae3400de7860a69f4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 9 Jun 2026 11:13:22 +0800 Subject: [PATCH 2/2] Fix tag log append EOF handling --- .../schemaregion/tag/TagLogFile.java | 3 + .../schemaregion/tag/TagLogFileTest.java | 57 +++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java index 62453e24e85c6..0ed0ba9afda62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java @@ -147,6 +147,9 @@ private List parseOffsetList(long position) throws IOException { blockOffset.add(position); // Read the first block ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); + if (position == fileChannel.size()) { + return blockOffset; + } IOUtils.readFully(fileChannel, byteBuffer, position); byteBuffer.flip(); if (byteBuffer.limit() > 0) { // This indicates that there is data at this position diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java new file mode 100644 index 0000000000000..555594cc207ff --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.schemaengine.schemaregion.tag; + +import org.apache.tsfile.external.commons.io.FileUtils; +import org.apache.tsfile.utils.Pair; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Map; + +public class TagLogFileTest { + + private File tempDir; + + @After + public void tearDown() throws Exception { + if (tempDir != null) { + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + public void writeAppendsFirstRecordWithoutReadingPastFileEnd() throws Exception { + tempDir = Files.createTempDirectory("tag-log-file").toFile(); + Map tags = Collections.singletonMap("tag", "value"); + Map attributes = Collections.singletonMap("attr", "value"); + + try (TagLogFile tagLogFile = new TagLogFile(tempDir.getAbsolutePath(), "tag.log")) { + long offset = tagLogFile.write(tags, attributes); + Pair, Map> result = tagLogFile.read(offset); + + Assert.assertEquals(tags, result.left); + Assert.assertEquals(attributes, result.right); + } + } +}