2022-10-24 16:21:39 +08:00
|
|
|
|
# Compaction Topic
|
|
|
|
|
|
|
|
|
|
|
|
## 使用方式
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
|
|
|
|
|
### 打开namesrv上支持顺序消息的开关
|
|
|
|
|
|
CompactionTopic依赖顺序消息来保障一致性
|
|
|
|
|
|
```shell
|
|
|
|
|
|
$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
|
|
|
|
|
|
```
|
|
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
### 创建compaction topic
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
```shell
|
2023-03-22 11:23:13 +08:00
|
|
|
|
$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
|
2022-10-24 16:21:39 +08:00
|
|
|
|
create topic to 127.0.0.1:10911 success.
|
2023-03-27 15:26:10 +08:00
|
|
|
|
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+cleanup.policy=COMPACTION}]
|
2022-10-24 16:21:39 +08:00
|
|
|
|
```
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
### 生产数据
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
与普通消息一样
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
```java
|
|
|
|
|
|
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
|
|
|
|
|
|
producer.setNamesrvAddr("localhost:9876");
|
|
|
|
|
|
producer.start();
|
|
|
|
|
|
|
|
|
|
|
|
String topic = "ctopic";
|
|
|
|
|
|
String tag = "tag1";
|
|
|
|
|
|
String key = "key1";
|
2023-03-27 15:26:10 +08:00
|
|
|
|
Message msg = new Message(topic, tag, key, "bodys".getBytes(StandardCharsets.UTF_8));
|
2022-10-24 16:21:39 +08:00
|
|
|
|
SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
|
|
|
|
|
|
int select = Math.abs(shardingKey.hashCode());
|
|
|
|
|
|
if (select < 0) {
|
|
|
|
|
|
select = 0;
|
|
|
|
|
|
}
|
|
|
|
|
|
return mqs.get(select % mqs.size());
|
|
|
|
|
|
}, key);
|
|
|
|
|
|
|
|
|
|
|
|
System.out.printf("%s%n", sendResult);
|
|
|
|
|
|
```
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
### 消费数据
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
|
|
|
|
|
|
在compaction场景下,大部分消费都是从0开始消费完整的数据
|
2023-03-22 11:23:13 +08:00
|
|
|
|
|
2022-10-24 16:21:39 +08:00
|
|
|
|
```java
|
|
|
|
|
|
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
|
|
|
|
|
|
consumer.setNamesrvAddr("localhost:9876");
|
|
|
|
|
|
consumer.setPullThreadNums(4);
|
|
|
|
|
|
consumer.start();
|
|
|
|
|
|
|
|
|
|
|
|
Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
|
|
|
|
|
|
consumer.assign(messageQueueList);
|
|
|
|
|
|
messageQueueList.forEach(mq -> {
|
|
|
|
|
|
try {
|
|
|
|
|
|
consumer.seekToBegin(mq);
|
|
|
|
|
|
} catch (MQClientException e) {
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
Map<String, byte[]> kvStore = Maps.newHashMap();
|
|
|
|
|
|
while (true) {
|
|
|
|
|
|
List<MessageExt> msgList = consumer.poll(1000);
|
2023-03-27 15:26:10 +08:00
|
|
|
|
if (CollectionUtils.isNotEmpty(msgList)) {
|
2022-10-24 16:21:39 +08:00
|
|
|
|
msgList.forEach(msg -> kvStore.put(msg.getKeys(), msg.getBody()));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//use the kvStore
|
|
|
|
|
|
```
|