Skip to content

Commit 112bcd8

Browse files
authored
ConfigNode supports filter region groups via specified database and time range (#17545)
1 parent 94b41da commit 112bcd8

15 files changed

Lines changed: 342 additions & 0 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
3434
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
3535
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
36+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
37+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
3638
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
3739
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
3840
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -519,6 +521,84 @@ public void testGetSlots() throws Exception {
519521
}
520522
}
521523

524+
@Test
525+
public void testGetRegionGroupsByTime() throws Exception {
526+
final String sg0 = "root.sg0";
527+
528+
try (SyncConfigNodeIServiceClient client =
529+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
530+
531+
// Get all region groups covering the full time range
532+
TGetRegionGroupsByTimeReq req = new TGetRegionGroupsByTimeReq(sg0, 0L, Long.MAX_VALUE);
533+
TGetRegionGroupsByTimeResp resp = client.getRegionGroupsByTime(req);
534+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
535+
Assert.assertNotNull(resp.getRegionReplicaSets());
536+
Assert.assertFalse(resp.getRegionReplicaSets().isEmpty());
537+
int allRegionGroupCount = resp.getRegionReplicaSetsSize();
538+
539+
// Each replica set should have testReplicationFactor replicas
540+
resp.getRegionReplicaSets()
541+
.forEach(
542+
replicaSet -> {
543+
Assert.assertEquals(testReplicationFactor, replicaSet.getDataNodeLocationsSize());
544+
Assert.assertEquals(
545+
TConsensusGroupType.DataRegion, replicaSet.getRegionId().getType());
546+
});
547+
548+
// Query with a single time slot range should return a subset
549+
TGetRegionGroupsByTimeReq singleSlotReq =
550+
new TGetRegionGroupsByTimeReq(sg0, 0L, testTimePartitionInterval - 1);
551+
TGetRegionGroupsByTimeResp singleSlotResp = client.getRegionGroupsByTime(singleSlotReq);
552+
Assert.assertEquals(
553+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), singleSlotResp.getStatus().getCode());
554+
Assert.assertNotNull(singleSlotResp.getRegionReplicaSets());
555+
Assert.assertFalse(singleSlotResp.getRegionReplicaSets().isEmpty());
556+
Assert.assertTrue(singleSlotResp.getRegionReplicaSetsSize() <= allRegionGroupCount);
557+
558+
// Query with a disjoint time range should return empty result
559+
TGetRegionGroupsByTimeReq disjointReq =
560+
new TGetRegionGroupsByTimeReq(
561+
sg0,
562+
testTimePartitionSlotsNum * testTimePartitionInterval * 2,
563+
testTimePartitionSlotsNum * testTimePartitionInterval * 3);
564+
TGetRegionGroupsByTimeResp disjointResp = client.getRegionGroupsByTime(disjointReq);
565+
Assert.assertEquals(
566+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), disjointResp.getStatus().getCode());
567+
Assert.assertTrue(
568+
disjointResp.getRegionReplicaSets() == null
569+
|| disjointResp.getRegionReplicaSets().isEmpty());
570+
571+
// Query non-existent database should return empty result
572+
TGetRegionGroupsByTimeReq nonExistReq =
573+
new TGetRegionGroupsByTimeReq("root.nonexistent", 0L, Long.MAX_VALUE);
574+
TGetRegionGroupsByTimeResp nonExistResp = client.getRegionGroupsByTime(nonExistReq);
575+
Assert.assertEquals(
576+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), nonExistResp.getStatus().getCode());
577+
Assert.assertTrue(
578+
nonExistResp.getRegionReplicaSets() == null
579+
|| nonExistResp.getRegionReplicaSets().isEmpty());
580+
581+
// Verify consistency: union of per-slot queries should equal full-range query
582+
Set<TConsensusGroupId> unionRegionIds = new HashSet<>();
583+
for (long t = 0; t < testTimePartitionSlotsNum; t++) {
584+
TGetRegionGroupsByTimeReq perSlotReq =
585+
new TGetRegionGroupsByTimeReq(
586+
sg0, t * testTimePartitionInterval, t * testTimePartitionInterval);
587+
TGetRegionGroupsByTimeResp perSlotResp = client.getRegionGroupsByTime(perSlotReq);
588+
Assert.assertEquals(
589+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), perSlotResp.getStatus().getCode());
590+
if (perSlotResp.getRegionReplicaSets() != null) {
591+
perSlotResp
592+
.getRegionReplicaSets()
593+
.forEach(replicaSet -> unionRegionIds.add(replicaSet.getRegionId()));
594+
}
595+
}
596+
Set<TConsensusGroupId> allRegionIds = new HashSet<>();
597+
resp.getRegionReplicaSets().forEach(replicaSet -> allRegionIds.add(replicaSet.getRegionId()));
598+
Assert.assertEquals(allRegionIds, unionRegionIds);
599+
}
600+
}
601+
522602
@Test
523603
public void testGetSchemaNodeManagementPartition() throws Exception {
524604

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
2323
import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
24+
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
2425
import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan;
2526
import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
2627
import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
@@ -248,6 +249,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
248249
case RemoveRegionLocation:
249250
plan = new RemoveRegionLocationPlan();
250251
break;
252+
case GetRegionGroupsByTime:
253+
plan = new GetRegionGroupsByTimePlan();
254+
break;
251255
case OfferRegionMaintainTasks:
252256
plan = new OfferRegionMaintainTasksPlan();
253257
break;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public enum ConfigPhysicalPlanType {
7171
CountTimeSlotList((short) 310),
7272
AddRegionLocation((short) 311),
7373
RemoveRegionLocation((short) 312),
74+
GetRegionGroupsByTime((short) 313),
7475

7576
/** Partition. */
7677
GetSchemaPartition((short) 400),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.consensus.request.read.region;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
23+
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
24+
import org.apache.iotdb.commons.utils.TimePartitionUtils;
25+
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
26+
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
27+
28+
import java.io.DataOutputStream;
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.Objects;
32+
33+
public class GetRegionGroupsByTimePlan extends ConfigPhysicalReadPlan {
34+
35+
private String database;
36+
37+
private TTimePartitionSlot startTimeSlot;
38+
39+
private TTimePartitionSlot endTimeSlot;
40+
41+
public GetRegionGroupsByTimePlan() {
42+
super(ConfigPhysicalPlanType.GetRegionGroupsByTime);
43+
}
44+
45+
public GetRegionGroupsByTimePlan(
46+
final String database, final long startTime, final long endTime) {
47+
super(ConfigPhysicalPlanType.GetRegionGroupsByTime);
48+
this.database = database;
49+
this.startTimeSlot = TimePartitionUtils.getTimePartitionSlot(startTime);
50+
this.endTimeSlot = TimePartitionUtils.getTimePartitionSlot(endTime);
51+
}
52+
53+
public String getDatabase() {
54+
return database;
55+
}
56+
57+
public TTimePartitionSlot getStartTimeSlot() {
58+
return startTimeSlot;
59+
}
60+
61+
public TTimePartitionSlot getEndTimeSlot() {
62+
return endTimeSlot;
63+
}
64+
65+
@Override
66+
protected void serializeImpl(final DataOutputStream stream) throws IOException {
67+
stream.writeShort(getType().getPlanType());
68+
BasicStructureSerDeUtil.write(database, stream);
69+
stream.writeLong(startTimeSlot.getStartTime());
70+
stream.writeLong(endTimeSlot.getStartTime());
71+
}
72+
73+
@Override
74+
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
75+
database = BasicStructureSerDeUtil.readString(buffer);
76+
startTimeSlot = new TTimePartitionSlot(buffer.getLong());
77+
endTimeSlot = new TTimePartitionSlot(buffer.getLong());
78+
}
79+
80+
@Override
81+
public boolean equals(final Object o) {
82+
if (this == o) {
83+
return true;
84+
}
85+
if (o == null || getClass() != o.getClass()) {
86+
return false;
87+
}
88+
final GetRegionGroupsByTimePlan that = (GetRegionGroupsByTimePlan) o;
89+
return Objects.equals(database, that.database)
90+
&& Objects.equals(startTimeSlot, that.startTimeSlot)
91+
&& Objects.equals(endTimeSlot, that.endTimeSlot);
92+
}
93+
94+
@Override
95+
public int hashCode() {
96+
return Objects.hash(database, startTimeSlot, endTimeSlot);
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.consensus.response.partition;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
23+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
25+
import org.apache.iotdb.consensus.common.DataSet;
26+
import org.apache.iotdb.rpc.TSStatusCode;
27+
28+
import java.util.Set;
29+
30+
public class GetRegionGroupsByTimeResp implements DataSet {
31+
32+
private final TSStatus status;
33+
34+
private final Set<TRegionReplicaSet> regionReplicaSets;
35+
36+
public GetRegionGroupsByTimeResp(
37+
final TSStatus status, final Set<TRegionReplicaSet> regionReplicaSets) {
38+
this.status = status;
39+
this.regionReplicaSets = regionReplicaSets;
40+
}
41+
42+
public TSStatus getStatus() {
43+
return status;
44+
}
45+
46+
public TGetRegionGroupsByTimeResp convertToRpcResp() {
47+
TGetRegionGroupsByTimeResp resp = new TGetRegionGroupsByTimeResp();
48+
resp.setStatus(status);
49+
50+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
51+
resp.setRegionReplicaSets(regionReplicaSets);
52+
}
53+
54+
return resp;
55+
}
56+
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@
199199
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
200200
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
201201
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
202+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
203+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
202204
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
203205
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
204206
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -2569,6 +2571,14 @@ public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
25692571
: new TGetSeriesSlotListResp(status);
25702572
}
25712573

2574+
@Override
2575+
public TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) {
2576+
TSStatus status = confirmLeader();
2577+
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
2578+
? partitionManager.getRegionGroupsByTime(req).convertToRpcResp()
2579+
: new TGetRegionGroupsByTimeResp(status);
2580+
}
2581+
25722582
@Override
25732583
public TSStatus migrateRegion(TMigrateRegionReq req) {
25742584
TSStatus status = confirmLeader();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@
118118
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
119119
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
120120
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
121+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
122+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
121123
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
122124
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
123125
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -854,6 +856,13 @@ TPermissionInfoResp login(
854856
*/
855857
TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req);
856858

859+
/**
860+
* Get DataRegion groups that overlap a time range for the given database.
861+
*
862+
* @return TGetRegionGroupsByTimeResp.
863+
*/
864+
TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req);
865+
857866
TSStatus migrateRegion(TMigrateRegionReq req);
858867

859868
TSStatus reconstructRegion(TReconstructRegionReq req);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
5050
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
5151
import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
52+
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
5253
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
5354
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
5455
import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
@@ -60,6 +61,7 @@
6061
import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
6162
import org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp;
6263
import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
64+
import org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp;
6365
import org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp;
6466
import org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp;
6567
import org.apache.iotdb.confignode.consensus.response.partition.GetTimeSlotListResp;
@@ -82,6 +84,7 @@
8284
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
8385
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
8486
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
87+
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
8588
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
8689
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
8790
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
@@ -1180,6 +1183,19 @@ public GetRegionIdResp getRegionId(final TGetRegionIdReq req) {
11801183
}
11811184
}
11821185

1186+
public GetRegionGroupsByTimeResp getRegionGroupsByTime(final TGetRegionGroupsByTimeReq req) {
1187+
final GetRegionGroupsByTimePlan plan =
1188+
new GetRegionGroupsByTimePlan(req.getDatabase(), req.getStartTime(), req.getEndTime());
1189+
try {
1190+
return (GetRegionGroupsByTimeResp) getConsensusManager().read(plan);
1191+
} catch (final ConsensusException e) {
1192+
LOGGER.warn(CONSENSUS_READ_ERROR, e);
1193+
final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
1194+
res.setMessage(e.getMessage());
1195+
return new GetRegionGroupsByTimeResp(res, Collections.emptySet());
1196+
}
1197+
}
1198+
11831199
public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
11841200
long startTime = req.isSetStartTime() ? req.getStartTime() : Long.MIN_VALUE;
11851201
long endTime = req.isSetEndTime() ? req.getEndTime() : Long.MAX_VALUE;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
4444
import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
4545
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
46+
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
4647
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
4748
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
4849
import org.apache.iotdb.confignode.consensus.request.read.table.DescTablePlan;
@@ -355,6 +356,8 @@ public DataSet executeQueryPlan(final ConfigPhysicalReadPlan req)
355356
return partitionInfo.countTimeSlotList((CountTimeSlotListPlan) req);
356357
case GetSeriesSlotList:
357358
return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
359+
case GetRegionGroupsByTime:
360+
return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan) req);
358361
case SHOW_CQ:
359362
return cqInfo.showCQ();
360363
case ShowExternalService:

0 commit comments

Comments
 (0)