# Module_Java_RabbitMQ_OpenSource **Repository Path**: iOceanPlus_Modules/JavaRabbitMQ ## Basic Information - **Project Name**: Module_Java_RabbitMQ_OpenSource - **Description**: No description available - **Primary Language**: Java - **License**: Not specified - **Default Branch**: develop - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2016-10-27 - **Last Updated**: 2023-01-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Module_Java_RabbitMQ_OpenSource #### 设计思路 1. 使用 rabbitMQ 的 topic 方式通信,exchange持久化,队列使用随机产生的队列名,连接断开时自动销毁队列(设置为exclusive属性),设置队列消息的过期时间(本程序中设默认为5min) 2. 为了提高性能,发送消息时,将多条消息组成一个 Arraylist 来一次性发送 3. 写一个 testMain 函数,对该库进行性能测试,包括吞吐率和丢包率 - 为了提高性能,应该增加包大小,减少发送频率,吞吐量能达到56MB/s以上 #### 主要对外接口 1. 封装了topic方式发送和接收数据的各一个类:MQTopicPublish和MQTopicConsume ;封装了消息的结构体 MessageStruct 2. MQTopicPublish.java 主要对外接口 - 构造函数:public MQTopicPublish(String IP,String user,String password,String EXCHANGE_NAME),传入rabbitMQ连接发送所需的IP地址、用户名、密码、交换器名称 - 发送函数:public void Publish(ArrayList< MessageStruct > messages),传入需发送的消息 ArrayList,每个MessageStruct 的具体结构见下文 3. MQTopicConsume.java 主要对外接口 - 构造函数:共2个构造函数,主体如下,public MQTopicConsume(String IP,String user,String password,String EXCHANGE_NAME,String[] routingKeys,int message_ttl),传入rabbitMQ队列接收所需的IP地址、用户名、密码、交换器名称、绑定的routingKeys数组、队列消息自动过期时间message_ttl(可不填,默认 5 min) - 接收函数:public void Consume(),无参数 4. MessageStruct.java 主要对外接口 - 构造函数: public MessageStruct(String routingKey,byte[] messageDetail),传入该消息所对应的routingKey、protobuf序列化后的消息数据 #### 运行环境 1. 引入附件中的 rabbitmq-java-client-bin-3.6.5 压缩包下的jar包,共6个jar包 2. 若要运行程序中 testMain 的主函数,需额外引入 protobuf 使用的相关 jar 包,因为 testMain 中使用了 protobuf 序列化,以便构造发送的消息包。相关 jar 包见附件中的 protobuf-java-3.1.0.jar #### 发送、接收类的使用示例 - 发送 Publish ``` // 构造消息结构体 String messageRoutingKey = "OnLine.PreprocessedData.AIS"; AISData.PBAISDynamic.Builder builder = AISData.PBAISDynamic.newBuilder(); builder.setMMSI(413322650); byte[] detailMessage = builder.build().toByteArray(); MessageStruct message = new MessageStruct(messageRoutingKey,detailMessage); // 构造ArrayList 发送,提高发送性能 ArrayList messages = new ArrayList(); for(int i=0;i<500;i++) messages.add(message); // 调用MQTopicPublish类 String IP = "127.0.0.1"; String user = "guest"; String password = "guest"; String EXCHANGE_NAME = "xgs"; MQTopicPublish publish = new MQTopicPublish(IP,user,password,EXCHANGE_NAME); // 开始发送消息Publish publish.Publish(messages); ``` - 接收 Consume ``` // 调用MQTopicConsume类 String IP = "127.0.0.1"; String user = "guest"; String password = "guest"; String EXCHANGE_NAME = "xgs"; String[] routingKeys = new String[]{"#"}; // "#"表示接收消息队列中的所有消息 MQTopicConsume consume = new MQTopicConsume(IP,user,password,EXCHANGE_NAME,routingKeys); // 开始接收符合绑定的routingKeys的消息 consume.Consume(); ``` - 对接收到的消息进行处理 ``` // 在 MQTopicConsume.java中 Consume()函数中的 handleDelivery 函数中处理接收到的消息 // 该函数代码如下,截取自MQTopicConsume.java Consumer consumer = new DefaultConsumer(channel){ /* * 处理接收到的消息 * */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { handleReceivedData(consumerTag, envelope, properties, body); } }; // 在 MQTopicConsume.java中 handleReceivedData() 函数中具体处理接收到的消息,以便使用者继承该类时方便改写处理数据函数 public void handleReceivedData(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){ //可以通过 envelope.getRoutingKey() 获得本条信息的 routingKey ,传递的消息具体内容为 byte[] body // 使用示例如下 String info = new String(body); System.out.println("[y] Receiver " + QUEUE_NAME + " Received " + envelope.getRoutingKey() + " : " + info); } ```