2021-06-23 16:03:14 +08:00
|
|
|
|
# 批量消息发送
|
|
|
|
|
|
批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。
|
|
|
|
|
|
|
|
|
|
|
|
### 1 发送批量消息
|
|
|
|
|
|
如果你一次只发送不超过 4MiB 的消息,使用批处理很容易:
|
|
|
|
|
|
```java
|
|
|
|
|
|
String topic = "BatchTest";
|
|
|
|
|
|
List<Message> messages = new ArrayList<>();
|
|
|
|
|
|
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
|
|
|
|
|
|
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
|
|
|
|
|
|
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
|
|
|
|
|
|
try {
|
|
|
|
|
|
producer.send(messages);
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
//handle the error
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
### 2 拆分
|
|
|
|
|
|
当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息:
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
public class ListSplitter implements Iterator<List<Message>> {
|
|
|
|
|
|
private final int SIZE_LIMIT = 1024 * 1024 * 4;
|
|
|
|
|
|
private final List<Message> messages;
|
|
|
|
|
|
private int currIndex;
|
|
|
|
|
|
public ListSplitter(List<Message> messages) {
|
|
|
|
|
|
this.messages = messages;
|
|
|
|
|
|
}
|
2022-08-01 20:42:34 +08:00
|
|
|
|
@Override
|
|
|
|
|
|
public boolean hasNext() {
|
2021-06-23 16:03:14 +08:00
|
|
|
|
return currIndex < messages.size();
|
|
|
|
|
|
}
|
2022-08-01 20:42:34 +08:00
|
|
|
|
@Override
|
|
|
|
|
|
public List<Message> next() {
|
2021-06-23 16:03:14 +08:00
|
|
|
|
int startIndex = getStartIndex();
|
|
|
|
|
|
int nextIndex = startIndex;
|
|
|
|
|
|
int totalSize = 0;
|
|
|
|
|
|
for (; nextIndex < messages.size(); nextIndex++) {
|
|
|
|
|
|
Message message = messages.get(nextIndex);
|
|
|
|
|
|
int tmpSize = calcMessageSize(message);
|
|
|
|
|
|
if (tmpSize + totalSize > SIZE_LIMIT) {
|
|
|
|
|
|
break;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
totalSize += tmpSize;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
List<Message> subList = messages.subList(startIndex, nextIndex);
|
|
|
|
|
|
currIndex = nextIndex;
|
|
|
|
|
|
return subList;
|
|
|
|
|
|
}
|
|
|
|
|
|
private int getStartIndex() {
|
|
|
|
|
|
Message currMessage = messages.get(currIndex);
|
|
|
|
|
|
int tmpSize = calcMessageSize(currMessage);
|
|
|
|
|
|
while(tmpSize > SIZE_LIMIT) {
|
|
|
|
|
|
currIndex += 1;
|
|
|
|
|
|
Message message = messages.get(curIndex);
|
|
|
|
|
|
tmpSize = calcMessageSize(message);
|
|
|
|
|
|
}
|
|
|
|
|
|
return currIndex;
|
|
|
|
|
|
}
|
|
|
|
|
|
private int calcMessageSize(Message message) {
|
|
|
|
|
|
int tmpSize = message.getTopic().length() + message.getBody().length();
|
|
|
|
|
|
Map<String, String> properties = message.getProperties();
|
|
|
|
|
|
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
|
|
|
|
|
tmpSize += entry.getKey().length() + entry.getValue().length();
|
|
|
|
|
|
}
|
|
|
|
|
|
tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
|
|
|
|
|
|
return tmpSize;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// then you could split the large list into small ones:
|
|
|
|
|
|
ListSplitter splitter = new ListSplitter(messages);
|
|
|
|
|
|
while (splitter.hasNext()) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
List<Message> listItem = splitter.next();
|
|
|
|
|
|
producer.send(listItem);
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
// handle the error
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|