RocketMQ Java 客户端封装

RocketMQ 应用非常多,但是在实际代码开发过程,我们肯定不能以上面的代码在实际项目中应用, 肯定是要把它们都封装一下,由自己提供的Api来调用RocketMQ,这样才能更方便!

对于消息队列,我们关注的地方:

  • 消息生产者

    对于消息生产者,我们只关注两点
    
    1. 队列名称
    2. 要发送的消息
    
  • 消息消费者

    对于消费者,我们只关注以下几点
    
    1. 订阅消息
    2. 集群消费消息
    

站在应用层上,调用方只关注Api调用,它不关注Rocketmq内部的具体实现,和初始化!

  • 定义一个接口:
package com.pangugle.framework.mq;
import com.pangugle.framework.service.Callback;

public  interface MQSupport{

	/**
	 * 对于rocketmq 没有用
	 * @param topic
	 */
	public  void declareTopic(String topic);
	public  void deleteTopic(String topic);

	/**
	 * 消息消息
	 * @param topic
	 * @param body
	 * @return
	 */
	public  boolean sendMessage(String topic, String body);
	public  boolean sendMessage(String topic, String body, String tags);

	/**
	 * 消费消息, 消息不重复消息
	 * @param tags
	 * @param callback
	 */
	public void consume(String topic, String tags, Callback<String> callback);

	/**
	 * 订阅消息,消息重复消费
	 * @param tags
	 * @param callback
	 */
	public void subscribe(String topic, String tags, Callback<String> callback);
}

上面我们定义了一个发送消息的方法:

1. sendMessage(String topic, String body);

2. sendMessage(String topic, String body, String tags);

和消费消息的方法:

1. consume(String topic, String tags, Callback<String> callback);

2. subscribe(String topic, String tags, Callback<String> callback);
  • Rocketmq 实现这个接口
package com.pangugle.framework.mq.impl;

import java.util.List;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import com.pangugle.framework.conf.MyConfiguration;
import com.pangugle.framework.log.Log;
import com.pangugle.framework.log.LogFactory;
import com.pangugle.framework.mq.MQSupport;
import com.pangugle.framework.service.Callback;
import com.pangugle.framework.utils.FastJsonHelper;
import com.pangugle.framework.utils.StringUtils;

public class RocketMQImpl implements MQSupport {

	private static Log LOG = LogFactory.getLog(RocketMQImpl.class);

	private static String DEFAULT_GROUP = "pangule_default_group";

	private static int DEFAULT_CONSMER_THREAD_SIZE = 5;

	private static String mServer = null;

	private static ClientConfig mClientConfig = new ClientConfig();
	private static DefaultMQProducer mProducer;

	public RocketMQImpl() {
		synchronized (RocketMQImpl.class) {
			if (mProducer == null) {
				initClientConfig();
				initProducer();
			}
		}
	}

	@Override
	public void declareTopic(String queue) {
	}

	@Override
	public void deleteTopic(String queue) {
	}

	@Override
	public boolean sendMessage(String queue, String body, String tags) {
		try {
			if(StringUtils.isEmpty(tags))
			{
				tags = StringUtils.getEmpty();
			}
			Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));

			// Call send message to deliver message to one of brokers.
			SendResult sendResult = mProducer.send(msg);
			if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
				return true;
			}
			LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult));
		} catch (Exception e) {
			LOG.error("send queue error:", e);
		}
		return false;
	}

	@Override
	public boolean sendMessage(String queue, String body) {
		return sendMessage(queue, body, null);
	}

	private static void initClientConfig() {
		mServer = MyConfiguration.getInstance().getString("mq.rocket.server");
		LOG.info("rocketmq.server = " + mServer);

		// 客户端本机 IP 地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
		// Name Server 地址列表,多个 NameServer 地址用分号 隔开
		mClientConfig.setNamesrvAddr(mServer);
		// 客户端实例名称,客户端创建的多个 Producer、 Consumer 实际是共用一个内部实例(这个实例包含
		// 网络连接、线程资源等),默认值:DEFAULT
		mClientConfig.setInstanceName("DEFAULT");
		// 通信层异步回调线程数,默认值4
		mClientConfig.setClientCallbackExecutorThreads(10);
		// 轮询 Name Server 间隔时间,单位毫秒,默认:30000
		// mClientConfig.setPollNameServerInterval(30000);
		// 向 Broker 发送心跳间隔时间,单位毫秒,默认:30000
		mClientConfig.setHeartbeatBrokerInterval(30000);
		// 持久化 Consumer 消费进度间隔时间,单位毫秒,默认:5000
		mClientConfig.setPersistConsumerOffsetInterval(5000);
	}

	private static void initProducer() {
		try {
			mProducer = new DefaultMQProducer();

			mProducer.resetClientConfig(mClientConfig);
			// 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 默认值 4
			mProducer.setDefaultTopicQueueNums(4);
			// 发送消息超时时间,单位毫秒 : 默认值 10000
			mProducer.setSendMsgTimeout(10000);
			// 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 默认值 4096
			mProducer.setCompressMsgBodyOverHowmuch(4096);
			// 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 默认值 FALSE
			mProducer.setRetryAnotherBrokerWhenNotStoreOK(false);

			mProducer.setProducerGroup(DEFAULT_GROUP);
//			mProducer.setRetryTimesWhenSendAsyncFailed(3);
			mProducer.start();
		} catch (Exception e) {
			LOG.error("init producer error:", e);
		}
	}

	@Override
	public void consume(String topic, String tags, Callback<String> callback) {
		try {
			if(StringUtils.isEmpty(tags))
			{
				tags = StringUtils.getEmpty();
			}
			DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
			consumer.setMessageModel(MessageModel.CLUSTERING);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
					// TODO Auto-generated method stub
					for(MessageExt ext : msgs)
			    	{
			    		try {
			    			String body =  new String(ext.getBody());
			    			callback.execute(body);
						} catch (Exception e) {
						}
			    	}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
		} catch (MQClientException e) {
			LOG.error("consume error:", e);
		}
	}

	@Override
	public void subscribe(String topic, String tags, Callback<String> callback) {
		try {
			if(StringUtils.isEmpty(tags))
			{
				tags = StringUtils.getEmpty();
			}
			DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
			consumer.setMessageModel(MessageModel.BROADCASTING);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
					// TODO Auto-generated method stub
					for(MessageExt ext : msgs)
			    	{
			    		try {
			    			String body =  new String(ext.getBody());
			    			callback.execute(body);
						} catch (Exception e) {
						}
			    	}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
		} catch (MQClientException e) {
			LOG.error("subscrebe error:", e);
		}
	}

	private static DefaultMQPushConsumer getConsumerInstance(String topic, String tags) throws MQClientException
	{
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
		consumer.resetClientConfig(mClientConfig);
		consumer.setConsumerGroup(topic + tags);
		consumer.setConsumeThreadMin(DEFAULT_CONSMER_THREAD_SIZE);
		consumer.setConsumeThreadMax(DEFAULT_CONSMER_THREAD_SIZE);
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		// mConsumer.subscribe(queue, "TagA || TagC || TagD");
		consumer.subscribe(topic, tags);
		return consumer;
	}

}
  • 简化调用:
package com.pangugle.framework.mq;

import java.io.IOException;
import java.util.Map;

import com.google.common.collect.Maps;
import com.pangugle.framework.mq.impl.RedisMQImpl;
import com.pangugle.framework.mq.impl.RocketMQImpl;
import com.pangugle.framework.service.Callback;
import com.pangugle.framework.utils.ThreadUtils;

public class MQManager{

	Map<String, MQSupport> maps = Maps.newConcurrentMap();

	private interface ManagerInternal {
		public MQManager mgr = new MQManager();
	}

	public static MQManager getInstance()
	{
		return ManagerInternal.mgr;
	}

	private MQManager()
	{
		//maps.put(MQImpl.REDIS.name(), new RedisMQImpl());
		maps.put(MQImpl.ROCKETMQ.name(), new RocketMQImpl());
	}

	public MQSupport getMQ()
	{
		return maps.get(MQImpl.ROCKETMQ.name());
	}

	public static enum MQImpl{
		REDIS, // redis
		ROCKETMQ; // rocketmq
	}

	public static void main(String[] args) throws InterruptedException, IOException
	{
    // 定义消息队列
		String queue = "pangugle_test";

    //
		String tags = null;

		MQSupport mq = MQManager.getInstance().getMQ();

    // 订阅消息消费
		mq.subscribe(queue, tags, new Callback<String>() {
			public void execute(String o) {
				System.out.println("consuemr 1 " + o);
			}
		});

    // 集群消息消息
//		mq.consume(queue, tags, new Callback<String>() {
//			public void execute(String o) {
//				System.out.println("consuemr 1 " + o);
//			}
//		});

		for(int i = 0; i < 1000; i ++)
		{
			mq.sendMessage(queue, "i = " + i, tags);
			ThreadUtils.sleep(1000);
		}

		System.in.read();
	}
}

注意上面测试:

要到控制台创建 Topic 队列名称,也就是 pangugle_test 这个名称!

好了搞定了

现在我们使用消息队列就非常简单了:

  • 初始化消息队列 MQSupport mq = MQManager.getInstance().getMQ();

  • 发送消息 sendMessage

  • 消费消息

    • subscribe
    • consume