Skip to content

Commit cd5f09e

Browse files
authored
Fix duplicate deletion emission in TsFileSplitter (#17534)
* Fix duplicate deletion emission in TsFileSplitter Avoid applying deletions while switching time chunk context, which could emit the same deletion entries multiple times for aligned multi-device files. Add a regression test that verifies emitted deletion mods match expected mods exactly. Made-with: Cursor * update
1 parent 4e06946 commit cd5f09e

3 files changed

Lines changed: 86 additions & 1 deletion

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public void serialize(DataOutputStream stream) throws IOException {
5757
deletion.serialize(stream);
5858
}
5959

60+
public ModEntry getModEntry() {
61+
return this.deletion;
62+
}
63+
6064
public static DeletionData deserialize(InputStream stream)
6165
throws IllegalPathException, IOException {
6266
return new DeletionData(ModEntry.createFrom(new DataInputStream(stream)));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte
153153
long chunkOffset = reader.position();
154154
timeChunkIndexOfCurrentValueColumn = pageIndex2TimesList.size();
155155
consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
156-
handleModification(deletions);
157156

158157
ChunkHeader header = reader.readChunkHeader(marker);
159158
String measurementId = header.getMeasurementID();

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.exception.IllegalPathException;
2424
import org.apache.iotdb.commons.exception.MetadataException;
25+
import org.apache.iotdb.commons.path.MeasurementPath;
2526
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2627
import org.apache.iotdb.db.exception.StorageEngineException;
2728
import org.apache.iotdb.db.exception.load.LoadFileException;
@@ -30,10 +31,15 @@
3031
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
3132
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
3233
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
34+
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
35+
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
36+
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
37+
import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
3338
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3439
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
3540
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
3641
import org.apache.iotdb.db.storageengine.load.splitter.AlignedChunkData;
42+
import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
3743
import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
3844
import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;
3945

@@ -53,7 +59,9 @@
5359
import java.io.ByteArrayInputStream;
5460
import java.io.ByteArrayOutputStream;
5561
import java.io.DataOutputStream;
62+
import java.io.File;
5663
import java.io.IOException;
64+
import java.nio.file.Files;
5765
import java.util.ArrayList;
5866
import java.util.Arrays;
5967
import java.util.Collections;
@@ -232,6 +240,80 @@ public void testCompactionFlushPageAndSplitByTimePartition()
232240
consumeChunkDataAndValidate(targetResource);
233241
}
234242

243+
@Test
244+
public void testDeletionDataShouldOnlyBeGeneratedOnceAtEnd()
245+
throws IOException, MetadataException, LoadFileException, IllegalPathException {
246+
TsFileResource resource = createAlignedMultiDeviceFile();
247+
try (ModificationFile modificationFile =
248+
new ModificationFile(
249+
ModificationFile.getExclusiveMods(resource.getTsFile()).getPath(), false)) {
250+
modificationFile.write(
251+
new TreeDeletionEntry(new MeasurementPath("root.testsg.d0.s0"), Long.MIN_VALUE, 100));
252+
modificationFile.write(
253+
new TreeDeletionEntry(new MeasurementPath("root.testsg.d0.s1"), 200, 300));
254+
modificationFile.write(
255+
new TreeDeletionEntry(new MeasurementPath("root.testsg.d1.s0"), Long.MIN_VALUE, 100));
256+
modificationFile.write(
257+
new TreeDeletionEntry(new MeasurementPath("root.testsg.d1.s1"), 200, 300));
258+
}
259+
260+
List<ModEntry> expectedMods = ModificationFile.readAllModifications(resource.getTsFile(), true);
261+
List<ModEntry> deletionMods = new ArrayList<>();
262+
File actualModsFile = new File(resource.getTsFilePath() + ".mods");
263+
try (ModificationFile actualModificationFile =
264+
new ModificationFile(actualModsFile.getAbsolutePath(), false)) {
265+
TsFileSplitter splitter =
266+
new TsFileSplitter(
267+
resource.getTsFile(),
268+
tsFileData -> {
269+
if (tsFileData instanceof DeletionData) {
270+
deletionMods.add(((DeletionData) tsFileData).getModEntry());
271+
}
272+
return true;
273+
});
274+
splitter.splitTsFileByDataPartition();
275+
}
276+
277+
Assert.assertEquals(expectedMods, deletionMods);
278+
Files.deleteIfExists(actualModsFile.toPath());
279+
}
280+
281+
private TsFileResource createAlignedMultiDeviceFile() throws IOException {
282+
TsFileResource resource = createEmptyFileAndResource(true);
283+
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) {
284+
writer.startChunkGroup("d0");
285+
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
286+
Arrays.asList("s0", "s1"),
287+
new TimeRange[][] {
288+
new TimeRange[] {new TimeRange(1, 100), new TimeRange(200, 300)},
289+
new TimeRange[] {
290+
new TimeRange(604799900, 604800020), new TimeRange(604810020, 604820020)
291+
}
292+
},
293+
TSEncoding.PLAIN,
294+
CompressionType.LZ4,
295+
Arrays.asList(false, false));
296+
writer.endChunkGroup();
297+
298+
writer.startChunkGroup("d1");
299+
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
300+
Arrays.asList("s0", "s1"),
301+
new TimeRange[][] {
302+
new TimeRange[] {new TimeRange(1, 100), new TimeRange(200, 300)},
303+
new TimeRange[] {
304+
new TimeRange(604799900, 604800020), new TimeRange(604810020, 604820020)
305+
}
306+
},
307+
TSEncoding.PLAIN,
308+
CompressionType.LZ4,
309+
Arrays.asList(false, false));
310+
writer.endChunkGroup();
311+
312+
writer.endFile();
313+
}
314+
return resource;
315+
}
316+
235317
private TsFileResource performCompaction()
236318
throws StorageEngineException,
237319
IOException,

0 commit comments

Comments
 (0)