Skip to content

Commit 35886a5

Browse files
authored
Pipe: Fixed the bug that attribute sync may cause ArrayIndexOutOfBoundsException / eliminate the existing attributes at the receiver which == null at the sender (#17539)
1 parent cd5f09e commit 35886a5

7 files changed

Lines changed: 57 additions & 10 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,16 +1055,15 @@ public static void executeNonQueries(BaseEnv env, List<String> sqlList) {
10551055
null);
10561056
}
10571057

1058-
public static void executeNonQueries(
1059-
BaseEnv env, List<String> sqlList, Connection defaultConnection) {
1058+
public static void executeNonQueries(BaseEnv env, List<String> sqlList, String sqlDialect) {
10601059
executeNonQueries(
10611060
env,
10621061
sqlList,
10631062
SessionConfig.DEFAULT_USER,
10641063
SessionConfig.DEFAULT_PASSWORD,
10651064
null,
1066-
TREE_SQL_DIALECT,
1067-
defaultConnection);
1065+
sqlDialect,
1066+
null);
10681067
}
10691068

10701069
public static void executeNonQueries(

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ public void testNoTree() throws Exception {
232232
"create database root.test",
233233
"alter database root.test with schema_region_group_num=2, data_region_group_num=3",
234234
"create timeSeries root.test.d1.s1 int32",
235-
"insert into root.test.d1 (s1) values (1)"),
236-
null);
235+
"insert into root.test.d1 (s1) values (1)"));
237236

238237
TestUtils.assertDataAlwaysOnEnv(
239238
receiverEnv,
@@ -418,4 +417,35 @@ public void testValidation() throws Exception {
418417
}
419418
}
420419
}
420+
421+
@Test
422+
public void testAttributeSync() {
423+
TestUtils.executeNonQueries(
424+
receiverEnv,
425+
Arrays.asList(
426+
"create database test",
427+
"use test",
428+
"create table table1(a tag, b attribute, c attribute, d int32)",
429+
"insert into table1 (time, a, b, c, d) values(1, 1, null, 1, 1), (2, 2, 2, null, 2)"),
430+
BaseEnv.TABLE_SQL_DIALECT);
431+
432+
TestUtils.executeNonQueries(
433+
senderEnv,
434+
Arrays.asList(
435+
"create database test",
436+
"use test",
437+
"create table table1(a tag, b attribute, c attribute, d int32)",
438+
"insert into table1 (time, a, b, c, d) values(1, 1, 1, null, 1), (2, 2, null, 2, 2)",
439+
String.format(
440+
"create pipe a2b with source ('inclusion'='schema') with sink ('node-urls'='%s')",
441+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())),
442+
BaseEnv.TABLE_SQL_DIALECT);
443+
444+
TestUtils.assertDataAlwaysOnEnv(
445+
receiverEnv,
446+
"show devices from table1",
447+
"a,b,c,",
448+
new HashSet<>(Arrays.asList("1,1,1,", "2,2,2,")),
449+
"test");
450+
}
421451
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanVisitor;
3232

3333
import org.apache.tsfile.file.metadata.IDeviceID;
34+
import org.apache.tsfile.utils.Constants;
3435
import org.apache.tsfile.utils.ReadWriteIOUtils;
3536

3637
import java.io.DataOutputStream;
@@ -215,7 +216,7 @@ protected void serializeAttributes(final DataOutputStream stream) throws IOExcep
215216
ReadWriteIOUtils.writeObject(value, stream);
216217
}
217218
for (int i = 0; i < attributeNameList.size() - deviceAttributeValueList.length; ++i) {
218-
ReadWriteIOUtils.writeObject(null, stream);
219+
ReadWriteIOUtils.writeObject(Constants.NONE, stream);
219220
}
220221
}
221222
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.db.schemaengine.schemaregion.attribute.update.UpdateDetailContainer;
2828

2929
import org.apache.tsfile.utils.Binary;
30+
import org.apache.tsfile.utils.Constants;
3031
import org.apache.tsfile.utils.RamUsageEstimator;
3132
import org.apache.tsfile.utils.ReadWriteIOUtils;
3233
import org.slf4j.Logger;
@@ -163,6 +164,12 @@ public Map<String, Binary> alterAttribute(
163164
final Map<String, Binary> attributeMap = deviceAttributeList.get(pointer);
164165
for (int i = 0; i < nameList.size(); i++) {
165166
final String key = nameList.get(i);
167+
if (valueList.length <= i) {
168+
break;
169+
}
170+
if (valueList[i] == Constants.NONE) {
171+
continue;
172+
}
166173
final Binary value = (Binary) valueList[i];
167174

168175
originMemUsage =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.snapshot.MemMTreeSnapshotUtil;
4646

4747
import org.apache.tsfile.utils.Binary;
48+
import org.apache.tsfile.utils.Constants;
4849
import org.apache.tsfile.utils.Pair;
4950
import org.apache.tsfile.utils.ReadWriteIOUtils;
5051
import org.slf4j.Logger;
@@ -447,7 +448,13 @@ private List<Object> genActivateTemplateOrUpdateDeviceStatement(
447448
attributeNameList = new ArrayList<>(tableAttributes.keySet());
448449
}
449450
final List<Object> attributeValues =
450-
attributeNameList.stream().map(tableAttributes::remove).collect(Collectors.toList());
451+
attributeNameList.stream()
452+
.map(
453+
attributeKey -> {
454+
final Object attributeValue = tableAttributes.remove(attributeKey);
455+
return Objects.nonNull(attributeValue) ? attributeValue : Constants.NONE;
456+
})
457+
.collect(Collectors.toList());
451458
tableAttributes.forEach(
452459
(attributeKey, attributeValue) -> {
453460
attributeNameList.add(attributeKey);

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SchemaRegionSnapshotParserTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.tsfile.file.metadata.enums.CompressionType;
4949
import org.apache.tsfile.file.metadata.enums.TSEncoding;
5050
import org.apache.tsfile.utils.Binary;
51+
import org.apache.tsfile.utils.Constants;
5152
import org.apache.tsfile.utils.Pair;
5253
import org.junit.After;
5354
import org.junit.Assert;
@@ -313,7 +314,9 @@ public void testTableDeviceAttributeTranslateSnapshot() throws Exception {
313314
Arrays.asList("a", "c"),
314315
Arrays.asList(
315316
new Binary[] {new Binary("b", TSFileConfig.STRING_CHARSET)},
316-
new Binary[] {null, new Binary("d", TSFileConfig.STRING_CHARSET)})),
317+
new Object[] {
318+
Constants.NONE, new Binary("d", TSFileConfig.STRING_CHARSET)
319+
})),
317320
new CreateOrUpdateDevice(
318321
databasePath.getNodes()[1],
319322
anotherTable,

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@
163163
<thrift.version>0.14.1</thrift.version>
164164
<xz.version>1.9</xz.version>
165165
<zstd-jni.version>1.5.6-3</zstd-jni.version>
166-
<tsfile.version>2.2.1-260327-SNAPSHOT</tsfile.version>
166+
<tsfile.version>2.3.0-260422-SNAPSHOT</tsfile.version>
167167
</properties>
168168
<!--
169169
if we claim dependencies in dependencyManagement, then we do not claim

0 commit comments

Comments
 (0)