2019-02-18 19:37:19 +08:00
# Schedule example
2019-03-03 23:02:43 +08:00
### 1 Start consumer to wait for incoming subscribed messages
2019-02-18 19:37:19 +08:00
``` java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer ;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext ;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus ;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently ;
import org.apache.rocketmq.common.message.MessageExt ;
import java.util.List ;
public class ScheduledMessageConsumer {
public static void main ( String [ ] args ) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ( " ExampleConsumer " ) ;
2022-06-04 16:10:16 +08:00
// Specify name server addresses
consumer . setNamesrvAddr ( " localhost:9876 " ) ;
2019-02-18 19:37:19 +08:00
// Subscribe topics
consumer . subscribe ( " TestTopic " , " * " ) ;
// Register message listener
consumer . registerMessageListener ( new MessageListenerConcurrently ( ) {
@Override
public ConsumeConcurrentlyStatus consumeMessage ( List < MessageExt > messages , ConsumeConcurrentlyContext context ) {
for ( MessageExt message : messages ) {
// Print approximate delay time period
System . out . println ( " Receive message[msgId= " + message . getMsgId ( ) + " ] "
+ ( System . currentTimeMillis ( ) - message . getStoreTimestamp ( ) ) + " ms later " ) ;
}
return ConsumeConcurrentlyStatus . CONSUME_SUCCESS ;
}
} ) ;
// Launch consumer
consumer . start ( ) ;
}
}
```
2019-03-03 23:02:43 +08:00
### 2 Send scheduled messages
2019-02-18 19:37:19 +08:00
``` java
import org.apache.rocketmq.client.producer.DefaultMQProducer ;
import org.apache.rocketmq.common.message.Message ;
public class ScheduledMessageProducer {
public static void main ( String [ ] args ) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer ( " ExampleProducerGroup " ) ;
2022-06-04 16:10:16 +08:00
// Specify name server addresses
producer . setNamesrvAddr ( " localhost:9876 " ) ;
2019-02-18 19:37:19 +08:00
// Launch producer
producer . start ( ) ;
int totalMessagesToSend = 100 ;
for ( int i = 0 ; i < totalMessagesToSend ; i + + ) {
Message message = new Message ( " TestTopic " , ( " Hello scheduled message " + i ) . getBytes ( ) ) ;
// This message will be delivered to consumer 10 seconds later.
message . setDelayTimeLevel ( 3 ) ;
// Send the message
producer . send ( message ) ;
}
// Shutdown producer after use.
producer . shutdown ( ) ;
}
}
```
2019-03-03 23:02:43 +08:00
### 3 Verification
2019-02-18 19:37:19 +08:00
You should see messages are consumed about 10 seconds later than their storing time.
2019-03-03 23:02:43 +08:00
### 4 Use scenarios for scheduled messages
2019-02-18 19:37:19 +08:00
For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released.
2019-03-03 23:02:43 +08:00
### 5 Restrictions on the use of scheduled messages
2019-02-18 19:37:19 +08:00
```java
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
` ``
Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries.
2019-03-03 23:02:43 +08:00
See ` SendMessageProcessor.java`