一、主要目的

       介绍用Java语言开发的对WebSocket协议的服务器的数据包解析和生成的代码。

 

二、主要代码

 (一)数据包的封装

   Data类是封装WebSocket的数据包。

public class Data{

 

    /**
     * 最终码标志
     */
    private int fin;

 

    /**
     * 操作码
     */
    private int opcode;

 

    /**
     * 掩码标志码,有掩码值为1,没有值为0
     */
    private int hasmask;

 

    /**
     * 掩码
     */
    private int mask;

 

    /**
     * 报文体的数据大小
     */
    private long length;

 

    /**
     * 负载数据
     */
    private byte[] body;

 

    public Data() {
    }

 

    public int getFin() {
        return fin;
    }

 

    public void setFin(int fin) {
        this.fin = fin;
    }

 

    public int getOpcode() {
        return opcode;
    }

 

    public void setOpcode(int opcode) {
        this.opcode = opcode;
    }

 

    public int getHasmask() {
        return hasmask;
    }

 

    public void setHasmask(int hasmask) {
        this.hasmask = hasmask;
    }

 

    public int getMask() {
        return mask;
    }

 

    public void setMask(int mask) {
        this.mask = mask;
    }

 

    public long getLength() {
        return length;
    }

 

    public void setLength(long length) {
        this.length = length;
    }

 

    public byte[] getBody(){
       return this.body;
   }

 

    public void setBody(byte[] data) {
        this.body = data;
    }

 

    /**
     * 编码WebSocket协议数据包的头部
     *
     * @return
     */
    public byte[] encodeHead() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();

 

        int temp1 = 0x00;

 

        // 如果是最终包,最高比特位设为1
        if (fin == 1) {
            temp1 = temp1 | 0x80;
        }

 

        // 设置opcode
        temp1 = temp1 | (opcode & 0x0F);
        baos.write(temp1);

 

        int temp2 = 0x00;

 

        // 如果有掩码,最高比特位设为1
        if (hasmask == 1) {
            temp2 = temp2 | 0x80;
        }

 

        // 设置长度
        if (length < 126) {
            int temp3 = (int) (length & 0x7F);
            temp2 = temp2 | temp3;

 

            // 长度只有7比特,没有扩展长度

            baos.write(temp2);
        } else if (length >= 126 && length <= 0xFFFF) {
            temp2 = temp2 | 126;


            baos.write(temp2);

 

            // 输出2个字节的扩展长度
            baos.write((int) (length >> 8));
            baos.write((int) (length & 0xFF));
        } else if (length > 0xFFFF) {
            temp2 = temp2 | 127;


            baos.write(temp2);

 

            // 输出8个字节的扩展长度
            baos.write((int) ((length >> 56) & 0xFF));
            baos.write((int) (length >> 48));
            baos.write((int) (length >> 40));
            baos.write((int) (length >> 32));
            baos.write((int) (length >> 24));
            baos.write((int) (length >> 16));
            baos.write((int) (length >> 8));
            baos.write((int) (length & 0xFF));
        }

 

        // 含有掩码
        if (hasmask == 1) {
            baos.write(mask >> 24);
            baos.write(mask >> 16);
            baos.write(mask >> 8);
            baos.write(mask & 0xFF);
        }

 

        return baos.toByteArray();
    }


}

 (二)工具类

public class Utils{


   /**
     * 读满字节数组的方法
     * @param input  输入流
     * @param data  字节数组

     * @param len  读取字节个数

     * return 读取个数

     */

 public static int readFull(InputStream input, byte[] data, int len) throws IOException{
    // 记录是否到末尾
    boolean isEOF = false;

 

    // 总数
    int total = 0;
    while (total < len) {
        int count = input.read(data, total, len - total);


        if (count == -1) {
            isEOF = true;
            break;
        }
        total = total + count;
    }

 

     // 如果到末尾了,抛出异常

    if (isEOF){
        throw new EOFException("input stream is close.");
    }


    return total;
  }

 

   /**
     * 读取一个WebSocket数据包
     * @param input  输入流
     * return  数据包 Data对象

     */

public static Data readData(InputStream input) throws IOException {
        Data data = new Data();

 

        // 先读第一个字节
        int temp = input.read();

 

        if (temp == -1){
            throw new EOFException("input stream is close.");
        }

 

        // 用 0x80进行位与判断是否为0x80,是设置fin为1,否则设置为0。
        int result = temp & 0x80;


        if (result == 0x80) {
            data.setFin(1);
        } else {
            data.setFin(0);
        }

 

        // 忽略3比特保留。
        // 获取右四位比特,设置为操作码。
        data.setOpcode(temp & 0x0F);

 

        // 读取第二个字节:
        temp = input.read();

 

        if (temp == -1){
            throw new EOFException("input stream is close.");
        }

 

        // 0x80和它比较,如果为0x80,设置hasmask为1,否则为0。
        result = temp & 0x80;


        if (result == 0x80) {
            data.setHasmask(1);
        } else {
            data.setHasmask(0);
        }

 

        // 用 0x7F进行位与运算获得值len;
        long length = temp & 0x7F;

 

        // 说明只有该7位比特指定的字节数
        if (length == 126) {
            //  读取后面连续的2个字节作为长度。
            int temp1 = input.read();

            if (temp1 == -1){
                throw new EOFException("input stream is close.");
            }

 

            length = temp1 << 8;

 

            temp1 = input.read();
            if (temp1 == -1){
                throw new EOFException("input stream is close.");
            }

 

            length = length | temp1;
        } else if (length == 127) {


            // 如果等于127,后面8个字节都是长度。
            byte[] buff = new byte[8];

            int size = input.read(buff, 0, 8);

 

            if (size == -1){
                throw new EOFException("input stream is close.");
            }

            if (size < 8) {
                // 记录剩余没有读取的字节数
                int stay = 8 - size;

 

                // 记录是否到末尾
                boolean isEOF = false;

 

                // 总数
                int total = size;


                while (stay > 0) {
                    int count = input.read(buff, total, stay);
                    if (count == -1) {
                        isEOF = true;
                        break;
                    }


                    total = total + count;
                    stay = 8 - total;
                }

 

                if (isEOF){
                    throw new EOFException("input stream is close.");
                }

 

                length = (buff[0] & 0xFFL) << 56;
                length = ((buff[1] & 0xFFL) << 48) | length;
                length = ((buff[2] & 0xFFL) << 40) | length;
                length = ((buff[3] & 0xFFL) << 32) | length;
                length = ((buff[4] & 0xFFL) << 24) | length;
                length = ((buff[5] & 0xFFL) << 16) | length;
                length = ((buff[6] & 0xFFL) << 8) | length;
                length = (buff[7] & 0xFFL) | length;
            }
        }

 

        // 设置长度
        data.setLength(length);

 

        //如果hasmask为1,读取连续的4个字节。
        if (data.getHasmask() == 1) {
            int temp4 = input.read();


            if (temp4 == -1){
                throw new EOFException();
            }


            int mask = (temp4 & 0xFF) << 24;

 

            temp4 = input.read();


            if (temp4 == -1){
                throw new EOFException();
            }


            mask = mask | ((temp4 & 0xFF) << 16);

            temp4 = input.read();
            if (temp4 == -1){
                throw new EOFException();
            }


            mask = mask | ((temp4 & 0xFF) << 8);

            temp4 = input.read();


            if (temp4 == -1){
                throw new EOFException("input stream is close.");
            }

 

            mask = mask | (temp4 & 0xFF);
            data.setMask(mask);
        }

 

        // 存放负载数据
        byte[] body = new byte[(int) data.getLength()];

 

        // 读满指定个数的字节
        readFull(input, body, body.length);

 

        // 掩码计算
        if(data.getHasmask() == 1){
            mask_data(data.getMask(), body, 0, body.length);
        }

 

        // 设置负载数据     
        data.setBody(body);

 

        // 返回
        return data;
}

 

   /**
     * 计算掩码的方法
     * @param mask
     * @param data
     */
    public static void mask_data(int mask, byte[] data, int offset, int length){
         // 定义4字节的数组
        byte[] masks = new byte[4];

 

        // 把mask的四字节获取,存入数组中
        masks[0] = (byte) (mask >> 24);
        masks[1] = (byte) (mask >> 16);
        masks[2] = (byte) (mask >> 8);
        masks[3] = (byte) (mask);

 

        for (int k = offset; k < length; k++) {
            if (k % 4 == 0){
                data[k] = (byte) (data[k] ^ masks[0]);
            } else if (k % 4 == 1){
                data[k] = (byte) (data[k] ^ masks[1]);
            } else if (k % 4 == 2){
                data[k] = (byte) (data[k] ^ masks[2]);
            } else if (k % 4 == 3){
                data[k] = (byte) (data[k] ^ masks[3]);
            }
        }
    }

}

 

 

(三)发送数据类

     Sender是自定义的数据发送类。

public class Sender{

 

   // 输出流

   private OutputStream out;

 

   public(OutputStream out){
      this.out = out;
   }

 

   /**
     * 发送文本数据的方法

     *     把文本数据打包到一个WebSocket数据帧
     * @param text  文本数据
     */

   public void sendText(String text) throws IOException{
        // 1、创建默认的对象
        Data unit = new Data();

 

        // 2、设置为最终包
        unit.setFin(1);
      
        // 3、服务端没有掩码
        unit.setHasmask(0);

 

        // 4、设置操作码为1
        unit.setOpcode(1);

 

        // 5、设置长度
        byte[] data = text.getBytes("UTF-8");
        unit.setLength(data.length);

 

        // 6、发送协议头
        output.write(unit.encodeHead());

 

        // 7、发送负载数据
        output.write(data);

 

        // 8、刷新缓冲
        output.flush();
   }

 

   /**
     * 发送二进制数据的方法

     *     把二进制数据打包到一个WebSocket数据帧
     * @param data  字节序列数据
     */

   public void sendBinary(byte[] data) throws IOException{
        // 1、创建默认的对象
        Data unit = new Data();

 

        // 2、设置为最终包
        unit.setFin(1);
      
        // 3、服务端没有掩码
        unit.setHasmask(0);

 

        // 4、设置操作码为2
        unit.setOpcode(2);

 

        // 5、设置长度
        unit.setLength(data.length);

 

        // 6、发送协议头
        output.write(unit.encodeHead());

 

        // 7、发送负载数据
        output.write(data);

 

        // 8、刷新缓冲
        output.flush();
   }


}

 

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐