• 技术文章 >Java >java教程

    实现HTTP协议收发MQ 消息的java代码实例

    Y2JY2J2017-05-02 13:59:39原创1406
    这篇文章主要通过实例代码为大家详细介绍了如何在Java 环境下使用 HTTP 协议收发 MQ 消息,需要的朋友可以参考下

    1. 准备环境

    在工程 POM 文件添加 HTTP Java 客户端的依赖。

    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-client</artifactId>
      <version>9.3.4.RC1</version>
     </dependency>  
     <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>ons-client</artifactId>
      <version>1.1.11</version>
     </dependency>

    2. 运行代码配置(user.properties)

    您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源 。

    #您在控制台创建的Topic
    Topic=xxx
    #公测url
    URL=http://publictest-rest.ons.aliyun.com
    #阿里云身份验证码
    Ak=xxx
    #阿里云身份验证密钥
    Sk=xxx
    #MQ控制台创建的Producer ID
    ProducerID=xxx
    #MQ控制台创建的Consumer ID
    ConsumerID=xxx

    说明:URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。

    3. HTTP 发送消息示例代码

    您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。
    
    package com.aliyun.openservice.ons.http.demo;
    import java.nio.charset.Charset;
    import java.util.Date;
    import java.util.Properties;
    import org.eclipse.jetty.client.HttpClient;
    import org.eclipse.jetty.client.api.ContentProvider;
    import org.eclipse.jetty.client.api.ContentResponse;
    import org.eclipse.jetty.client.api.Request;
    import org.eclipse.jetty.client.util.StringContentProvider;
    import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
    public class HttpProducer {
      public static String SIGNATURE="Signature";
      public static String NUM="num";
      public static String CONSUMERID="ConsumerID";
      public static String PRODUCERID="ProducerID";
      public static String TIMEOUT="timeout";
      public static String TOPIC="Topic";
      public static String AK="AccessKey";
      public static String BODY="body"; 
      public static String MSGHANDLE="msgHandle";
      public static String TIME="time";
      public static void main(String[] args) throws Exception {
        HttpClient httpClient=new HttpClient(); 
        httpClient.setMaxConnectionsPerDestination(1);
        httpClient.start(); 
        Properties properties=new Properties();
        properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));
        String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic
        String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
        String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
        String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
        String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID
        String date=String.valueOf(new Date().getTime()); 
        String sign=null;
        String body="hello ons http";
        String NEWLINE="\n";
        String signString;
        for (int i = 0; i < 10; i++) {
          date=String.valueOf(new Date().getTime());
          Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
          ContentProvider content=new StringContentProvider(body);
          req.content(content);
          signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;
          System.out.println(signString);
          sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
          req.header(SIGNATURE, sign);
          req.header(AK, ak);
          req.header(PRODUCERID, pid);
          ContentResponse response;
          response=req.send();
          System.out.println("send msg:"+response.getStatus()+response.getContentAsString());
        } 
      }
    }

    4. HTTP接收消息示例代码

    请按以下说明设置相应参数并测试 HTTP 消息接收功能。

    package com.aliyun.openservice.ons.http.demo;
    import java.nio.charset.Charset;
    import java.util.Date;
    import java.util.List;
    import java.util.Properties;
    import org.eclipse.jetty.client.HttpClient;
    import org.eclipse.jetty.client.api.ContentProvider;
    import org.eclipse.jetty.client.api.ContentResponse;
    import org.eclipse.jetty.client.api.Request;
    import org.eclipse.jetty.client.util.StringContentProvider;
    import org.eclipse.jetty.http.HttpMethod;
    import com.alibaba.fastjson.JSON;
    import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;
    import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
    public class HttpConsumer {
      public static String SIGNATURE="Signature";
      public static String NUM="num";
      public static String CONSUMERID="ConsumerID";
      public static String PRODUCERID="ProducerID";
      public static String TIMEOUT="timeout";
      public static String TOPIC="Topic";
      public static String AK="AccessKey";
      public static String BODY="body"; 
      public static String MSGHANDLE="msgHandle";
      public static String TIME="time";
      public static void main(String[] args) throws Exception {
        HttpClient httpClient=new HttpClient(); 
        httpClient.setMaxConnectionsPerDestination(1);
        httpClient.start(); 
        Properties properties=new Properties();
        properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));
        String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic
        String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
        String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
        String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
        String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID
        String date=String.valueOf(new Date().getTime()); 
        String sign=null;
        String NEWLINE="\n";
        String signString;
        System.out.println(NEWLINE+NEWLINE);
        while (true) { 
          try {
            date=String.valueOf(new Date().getTime());
            Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);
            req.method(HttpMethod.GET);
            ContentResponse response;
            signString=topic+NEWLINE+cid+NEWLINE+date;
            sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
            req.header(SIGNATURE, sign);
            req.header(AK, ak);
            req.header(CONSUMERID, cid);
            long start=System.currentTimeMillis();
            response=req.send();
            System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000 
                      +"  "+response.getStatus()+"  "+response.getContentAsString()); 
            List<SimpleMessage> list = null;
            if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) {
               list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class);
            }
            if (list==null||list.size()==0) {
              Thread.sleep(100);
              continue;
            } 
            System.out.println("size is :"+list.size());
            for (SimpleMessage simpleMessage : list) {
              date=String.valueOf(new Date().getTime());
              System.out.println("receive msg:"+simpleMessage.getBody()+"  born time "+simpleMessage.getBornTime());
              req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);
              req.method(HttpMethod.DELETE);
              signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;
              sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
              req.header(SIGNATURE, sign);
              req.header(AK, ak);
              req.header(CONSUMERID, cid);
              response=req.send();  
              System.out.println("delete msg:"+response.toString());
            } 
            Thread.sleep(100);
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    }

    5. HTTP示例程序工具类

    (1)消息封装类: SimpleMessage.java

    package com.aliyun.openservice.ons.http.demo;
    public class SimpleMessage {
      private String body;
      private String msgId;
      private String bornTime;
      private String msgHandle;
      private int reconsumeTimes;
      private String tag;
      public void setTag(String tag) {
        this.tag = tag;
      }
      public String getTag() {
        return tag;
      }
      public int getReconsumeTimes() {
        return reconsumeTimes;
      }
      public void setReconsumeTimes(int reconsumeTimes) {
        this.reconsumeTimes = reconsumeTimes;
      }
      public void setMsgHandle(String msgHandle) {
        this.msgHandle = msgHandle;
      }
      public String getMsgHandle() {
        return msgHandle;
      }
      public String getBody() {
        return body;
      }
      public void setBody(String body) {
        this.body = body;
      }
      public String getMsgId() {
        return msgId;
      }
      public void setMsgId(String msgId) {
        this.msgId = msgId;
      }
      public String getBornTime() {
        return bornTime;
      }
      public void setBornTime(String bornTime) {
        this.bornTime = bornTime;
      }
    }

    (2)字符串签名类: MD5.java

    package com.aliyun.openservice.ons.http.demo;
    import java.io.UnsupportedEncodingException;
    import java.nio.charset.Charset;
    import java.security.MessageDigest;
    import java.sql.SQLException;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.locks.ReentrantLock;
    import org.slf4j.LoggerFactory;
    public class MD5 {
      private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);
      private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
      private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16);
      static {
        for (int i = 0; i < digits.length; ++i) {
          rDigits.put(digits[i], i);
        }
      }
      private static MD5 me = new MD5();
      private MessageDigest mHasher;
      private final ReentrantLock opLock = new ReentrantLock();
      private MD5() {
        try {
          this.mHasher = MessageDigest.getInstance("md5");
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
      public static MD5 getInstance() {
        return me;
      }
      public String getMD5String(String content) {
        return this.bytes2string(this.hash(content));
      }
      public String getMD5String(byte[] content) {
        return this.bytes2string(this.hash(content));
      }
      public byte[] getMD5Bytes(byte[] content) {
        return this.hash(content);
      }
      public byte[] hash(String str) {
        this.opLock.lock();
        try {
          byte[] bt = this.mHasher.digest(str.getBytes("utf-8"));
          if (null == bt || bt.length != 16) {
            throw new IllegalArgumentException("md5 need");
          }
          return bt;
        } catch (UnsupportedEncodingException e) {
          throw new RuntimeException("unsupported utf-8 encoding", e);
        } finally {
          this.opLock.unlock();
        }
      }
      public byte[] hash(byte[] data) {
        this.opLock.lock();
        try {
          byte[] bt = this.mHasher.digest(data);
          if (null == bt || bt.length != 16) {
            throw new IllegalArgumentException("md5 need");
          }
          return bt;
        } finally {
          this.opLock.unlock();
        }
      }
      public String bytes2string(byte[] bt) {
        int l = bt.length;
        char[] out = new char[l << 1];
        for (int i = 0, j = 0; i < l; i++) {
          out[j++] = digits[(0xF0 & bt[i]) >>> 4];
          out[j++] = digits[0x0F & bt[i]];
        }
        if (log.isDebugEnabled()) {
          log.debug("[hash]" + new String(out));
        }
        return new String(out);
      }
      public byte[] string2bytes(String str) {
        if (null == str) {
          throw new NullPointerException("Argument is not allowed empty");
        }
        if (str.length() != 32) {
          throw new IllegalArgumentException("String length must equals 32");
        }
        byte[] data = new byte[16];
        char[] chs = str.toCharArray();
        for (int i = 0; i < 16; ++i) {
          int h = rDigits.get(chs[i * 2]).intValue();
          int l = rDigits.get(chs[i * 2 + 1]).intValue();
          data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F);
        }
        return data;
      }
    }

    希望本篇文章对您有所帮助

    以上就是实现HTTP协议收发MQ 消息的java代码实例的详细内容,更多请关注php中文网其它相关文章!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:介绍一个javaWeb自定义标签的用法详解 下一篇:分享一个配置Spring4.0注解Cache+Redis缓存的用法实例
    PHP编程就业班

    相关文章推荐

    • JAVA学习IO操作之字节流和字符流(总结分享)• 完全掌握JAVA流程控制• Java学习总结之数组(整理分享)• Java工厂方法模式详解• 详细整理java枚举的使用总结

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网