电子说
本篇分享一下 HarmonyOS 中的Socket使用方法
将从2个方面实践:
通过循序渐进的方式,全面了解实践HarmonyOS中的Socket用法
学习本章前先熟悉文档开发知识更新库gitee.com/li-shizhen-skin/harmony-os/blob/master/README.md
前往。
注意 :编译时IDE会给出如下警告提示,@ohos.net.socket 这个包没有验证过。
或者+mau123789熟悉文档是v喔!
Currently module for '@ohos.net.socket' is not verified. If you're importing napi, its verification will be enabled in later SDK version. Please make sure the corresponding .d.ts file is provided and the napis are correctly declared.
import socket from '@ohos.net.socket';
//注意,这里声明变量tcp, 建议将类型加上,否则不利于IDE联想API
let tcp: socket.TCPSocket = socket.constructTCPSocketInstance();
let bindAddress = {
address: '192.168.xx.xx',
port: x
};
let connectAddress = {
address: '192.168.xx.xx',
port: x
};
//1. 绑定本机地址和端口
tcp.bind(bindAddress, err = > {
if(err){
console.log('1-' + JSON.stringify(err))
return
}
//2. 连接远端服务器
tcp.connect({
address: connectAddress,
timeout: 30000
}).then( r = > {
console.log('2-' +JSON.stringify(r))
}).catch((e) = > {
console.log('3-' + JSON.stringify(e))
})
})
复制
tcp.on('connect', () = > {
});
复制
tcp.off('connect', () = > {
});
复制
tcp.on('message', () = > {
//即:可以在这里随时接收到服务端发送过来的消息
});
复制
tcp.off('message', () = > {
});
复制
let msg: string = '我是HarmonyOS客户端'
tcp.send({data: msg}, (error)= >{
if(error) {
console.log('消息没有发送成功: ' + JSON.stringify(error))
} else {
console.log('消息发送成功')
}
})
复制
这里使用IntelliJ IDEA创建一个Java工程,然后运行在自己电脑上即可使用
如果你实在没法自己实现ServerSocket,可以在网上找一个调试Socket的工具
package com.harvey.socketserver;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
public class Main {
public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(6666);
//一. 建立客户端监听
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
Socket clientSocket = serverSocket.accept();
System.out.println("客户端:" + clientSocket.getInetAddress().getLocalHost()+"已连接到服务器");
new Server(clientSocket).start();
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制
package com.harvey.socketserver;
import java.io.*;
import java.net.Socket;
public class Server extends Thread{
Socket clientSocket;
InputStream is;
OutputStream os;
String lastReceiverMessage;
int LIVE_TIME = 60*1000;
public Server(Socket clientSocket){
this.clientSocket = clientSocket;
try {
is = this.clientSocket.getInputStream();
os = this.clientSocket.getOutputStream();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(LIVE_TIME != 0){
if ( this.clientSocket != null ) {
if ( this.clientSocket.isClosed() || this.clientSocket.isInputShutdown() || this.clientSocket.isOutputShutdown()) {
LIVE_TIME = 0;
} else {
readMessage();
responseMessage();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LIVE_TIME -= 1000;
}
} else {
LIVE_TIME = 0;
}
}
closeAllChannel();
}
//释放资源
private void closeAllChannel(){
try {
if(clientSocket != null){
clientSocket.close();
clientSocket = null;
}
if(is != null){
is.close();
is = null;
}
if(os != null){
os.close();
os = null;
}
} catch (IOException e) {
e.printStackTrace();
}
}
//读取客户端消息, 注意:这里是为了省事,用的readLine接口
//如果消息中有换行符,则会丢失消息
private void readMessage(){
BufferedReader br = new BufferedReader(new InputStreamReader(is));
try {
lastReceiverMessage = br.readLine();
System.out.println("已接收到端消息:【" + lastReceiverMessage +"】");
} catch (IOException e) {
System.err.println("接收消息失败:" + e.getMessage());
}
}
//自动回复客户端消息
private void responseMessage(){
try {
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream()));
bw.write(System.currentTimeMillis() + "亲爱的客户端,已收到您的来信。" + lastReceiverMessage + "n");
bw.flush();
System.out.println("回复消息成功");
} catch (IOException e) {
System.err.println("回复消息失败:" + e.getMessage());
}
}
}
复制
为了验证Socket,交互页面包含如下功能:
import socket from '@ohos.net.socket'
import util from '@ohos.util';
import Prompt from '@system.prompt';
let tcpSocket: socket.TCPSocket = null
@Entry
@Component
struct SocketPage {
@State isConnected: boolean = false
@State sendMessage: string = ''
@State inputMessage: string = ''
@State receiveMessage: string = ''
@State connectButtonBGColor: Color = Color.Gray
@State connectButtonTextColor: Color = Color.White
//页面创建时,注册自定义消息,监听来消息和连接状态
aboutToAppear(){
getContext().eventHub.on('remotemsg', (value)= >{
this.receiveMessage = value
})
getContext().eventHub.on('connectStatus', (value)= >{
if(value === 'connect'){
this.connectButtonBGColor = Color.Green
} else if(value === 'close'){
this.connectButtonBGColor = Color.Gray
}
})
}
//页面销毁时,解除所有订阅,关闭socket
aboutToDisappear() {
tcpSocket.off("message")
tcpSocket.off("connect")
tcpSocket.off("close")
tcpSocket.close()
tcpSocket = null
}
build(){
Column( {space: 20} ){
Row( {space: 20} ){
Button('连接').width(60).height(60).fontSize(12)
.backgroundColor(this.connectButtonBGColor)
.fontColor(this.connectButtonTextColor)
.onClick( ()= > {
if(this.isConnected){
tcpSocket.close()
} else {
connectServer()
}
})
}
Column({space: 30}){
Text('发送:' + this.sendMessage).fontSize(20).width('100%')
Text('接收:' + this.receiveMessage).fontSize(20).width('100%')
}.backgroundColor(Color.Pink)
.padding( { top: 20, bottom: 20} )
Row({space: 10}) {
TextArea({placeholder: '输入文字', text: this.inputMessage})
.onChange( (value) = > {
this.inputMessage = value
})
.fontSize(20)
.width('75%')
Button('发送').fontColor(29).onClick( () = > {
sendMessage(this.inputMessage)
this.sendMessage = this.inputMessage
this.inputMessage = ''
}).width('20%')
}
.width('100%')
.justifyContent(FlexAlign.Center)
}
.width('100%')
.height('100%')
.alignItems(HorizontalAlign.Start)
.justifyContent(FlexAlign.Start)
.padding({top: px2vp(120)})
}
}
//发送消息
function sendMessage(msg: string){
if(tcpSocket){
tcpSocket.send({data: msg + 'n'}, (error)= >{
if(error) {
console.log('消息没有发送成功: ' + JSON.stringify(error))
} else {
getContext().eventHub.emit('')
console.log('消息发送成功')
}
})
} else {
Prompt.showToast({message: '还没有连接服务器'})
}
}
//开始连接服务器
function connectServer(){
let bindAddress = {
address: '192.168.71.66',
port: 1983,
family: 1
};
let connectAddress = {
address: '192.168.71.23',
port: 6666 //端口号要和ServerSocket 一致
};
tcpSocket = socket.constructTCPSocketInstance()
tcpSocket.on('close', () = > {
console.log("on close")
getContext().eventHub.emit('connectStatus', 'close')
});
tcpSocket.on('connect', () = > {
console.log("on connect")
getContext().eventHub.emit('connectStatus', 'connect')
});
tcpSocket.on('message' , ( value: {message: ArrayBuffer, remoteInfo: socket.SocketRemoteInfo} ) = > {
let view = new Uint8Array(value.message);
let textDecoder = util.TextDecoder.create()
let str = textDecoder.decodeWithStream(view);
getContext().eventHub.emit('remotemsg', str)
})
tcpSocket.bind(bindAddress, err = > {
if(err){
console.log('1-' + JSON.stringify(err))
return
}
tcpSocket.connect({
address: connectAddress,
timeout: 30000
}).then( r = > {
console.log('2-' +JSON.stringify(r))
}).catch((e) = > {
console.log('3-' + JSON.stringify(e))
})
})
}
复制
MQTT使用的是 mosquitto,官网下载地址
关于MQTT的入门使用,可参见“MQTT试用”
注意:mosquitto 安装完成后,需要打开匿名设置,并且监听自己电脑的IP和1883端口 mosquitto的配置文件名:mosquitto.conf
https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.pdf
客户端使用Socket 连接mosquitto, 并且响应连接ACK
这部分需要分为两步:1. Socket连接 2. MQTT连接请求(即发送连接请求命令)
function connectMQTTServer(){
let bindAddress = {
address: '192.168.71.66',
port: 1983,
family: 1
};
let connectAddress = {
address: '192.168.71.23', //MQTT服务器IP
port: 1883 //MQTT服务器端口
};
tcpSocket = socket.constructTCPSocketInstance()
listenerStatus()
tcpSocket.bind(bindAddress, err = > {
if(err){
console.log('1-' + JSON.stringify(err))
return
}
tcpSocket.connect({
address: connectAddress,
timeout: 30000
}).then( r = > {
console.log('2-' +JSON.stringify(r))
}).catch((e) = > {
console.log('3-' + JSON.stringify(e))
})
})
}
复制
tcpSocket.on('connect', () = > {
console.log("on connect")
//拼接MQTT请求数据
let connectCMD = MQTTConnect.getFixHeader()
//发送MQTT连接请求命令
tcpSocket.send({data: connectCMD.buffer}, (error)= >{
if(error) {
console.log('消息没有发送成功: ' + JSON.stringify(error))
} else {
console.log('消息发送成功')
}
})
getContext().eventHub.emit('connectStatus', 'connect')
});
复制
代码有点枯燥,都是要对着MQTT 5.0规范文档来实现的
import util from '@ohos.util'
export default class MQTTConnect{
//3.1.1 CONNECT Fixed Header 【2字节】
//3.1.2 CONNECT Variable Header
//3.1.2.1 Protocol Name 【6字节】
//3.1.2.2 Protocol Version 【1字节】
//3.1.2.3 Connect Flags 【1字节】
//3.1.2.10 Keep Alive 【2字节】
//3.1.2.11 CONNECT Properties
//3.1.2.11.1 Property Length 【1字节】
//3.1.2.11.2 Session Expiry Interval 【4字节】
//3.1.2.11.3 Receive Maximum 【2字节】
//3.1.2.11.4 Maximum Packet Size 【4字节】
//3.1.2.11.5 Topic Alias Maximum 【2字节】
//3.1.2.11.6 Request Response Information 【1字节】
//3.1.2.11.7 Request Problem Information 【1字节】
//3.1.2.11.8 User Property【UTF-8 String Pair】
//3.1.2.11.9 Authentication Method 【UTF-8 String】
//3.1.2.11.10 Authentication Data【Binary Data】
//3.1.3 CONNECT Payload
//3.1.3.1 Client Identifier (ClientID) 【UTF-8 String】
//3.1.3.2 Will Properties
//3.1.3.2.1 Property Length 【Variable Byte Integer】
//3.1.3.2.2 Will Delay Interval 【4字节】
//3.1.3.2.3 Payload Format Indicator 【1字节】
//3.1.3.2.4 Message Expiry Interval 【4字节】
//3.1.3.2.5 Content Type【UTF-8 String】
//3.1.3.2.6 Response Topic【UTF-8 String】
//3.1.3.2.7 Correlation Data 【Binary Data】
//3.1.3.2.8 User Property【UTF-8 String Pair】
//3.1.3.3 Will Topic 【UTF-8 String】
//3.1.3.4 Will Payload【 Binary Data】
//3.1.3.5 User Name【UTF-8 String】
//3.1.3.6 Password【 Binary Data】
//3.1.4 CONNECT Actions
public static getFixHeader(): Uint8Array{
let remainLength: number = 0
//3.1.1 CONNECT Fixed Header - 包类型
let abPacketType = new ArrayBuffer(1)
const dv_abPacketType = new DataView(abPacketType);
dv_abPacketType.setInt8(0, 0x10)
//3.1.2.1 Protocol Name
let u8a_protolName = this.utf8String('MQTT')
remainLength += u8a_protolName.length
//3.1.2.2 Protocol Version
let version = new Uint8Array([5])
remainLength++
//3.1.2.3 Connect Flags
const UserNameFlag: number = 0x80
const PasswordFlag: number = 0x40
const WillRetain: number = 0x20
const WillQoS0: number = 0x00
const WillQoS1: number = 0x8
const WillQoS2: number = 0x10
const WillQoS3: number = 0x18
const WillFlag: number = 0x4
const CleanStart: number = 0x2
let connectFlags: number = 0
//可以根据实际对外暴露的接口,在这里进行与运算
connectFlags = CleanStart
let u8a_connectFlags = new Uint8Array([connectFlags])
remainLength++
//3.1.2.10 Keep Alive
const keepAlive = 60
let u8a_keepalive = new Uint8Array([(keepAlive & 0xff00) > > 8 , keepAlive & 0xff])
remainLength += 2
//3.1.2.11 CONNECT Properties
//3.1.2.11.1 Property Length
let u8a_propertylength = new Uint8Array([0])
remainLength++
//3.1.3 CONNECT Payload
//3.1.3.1 Client Identifier (ClientID)
let u8a_clientidentifier = this.utf8String('Harvey鸿蒙')
remainLength += u8a_clientidentifier.length
//3.1.1 CONNECT Fixed Header - 包剩余长度
let abRemainLength = new ArrayBuffer(1)
const dv_remainLength = new DataView(abRemainLength);
dv_remainLength.setInt8(0, remainLength)
let allIndex: number = 0
let allUint8Array = new Uint8Array(2 + remainLength)
allUint8Array[allIndex++] = dv_abPacketType.getUint8(0) //包类型
allUint8Array[allIndex++] = dv_remainLength.getUint8(0) //包剩余长度
u8a_protolName.forEach((value)= >{ //协议名称
allUint8Array[allIndex++] = value
})
version.forEach((value)= >{ //协议版本号
allUint8Array[allIndex++] = value
})
u8a_connectFlags.forEach((value)= >{ //连接标志
allUint8Array[allIndex++] = value
})
u8a_keepalive.forEach((value)= >{ //长连保活时间
allUint8Array[allIndex++] = value
})
u8a_propertylength.forEach((value)= >{ //连接属性长度
allUint8Array[allIndex++] = value
})
u8a_clientidentifier.forEach((value)= >{ //客户端名称
allUint8Array[allIndex++] = value
})
//数值打印
let str = [...new Uint8Array(abPacketType),...new Uint8Array(abRemainLength), ...u8a_protolName, ...version
, ...u8a_connectFlags, ...u8a_keepalive, ...u8a_propertylength, ...u8a_clientidentifier]
.map(x = > x.toString(16).padStart(2, '0'))
.join(' ')
console.log(str)
let allStr: string = ''
allUint8Array.forEach((value) = > {
allStr = allStr.concat(value.toString(16).padStart(2, '0')).concat(' ')
})
console.log(allStr)
return allUint8Array
}
private static utf8String(content: string): Uint8Array{
const encoder = new util.TextEncoder()
let u8a_encoder = encoder.encodeInto(content)
let encoderLength = u8a_encoder.length
let abEncoder = new ArrayBuffer(encoderLength + 2)
const dv_encoder = new DataView(abEncoder)
dv_encoder.setInt8(0, (encoderLength & 0xff00) > > 8)
dv_encoder.setInt8(1, encoderLength & 0x00ff)
let index: number = 2
u8a_encoder.forEach( (value) = > {
dv_encoder.setInt8(index++, value)
})
return new Uint8Array(abEncoder)
}
}
复制
DevEco IDE 日志控制台
tcpSocket.on('message' , ( value: {message: ArrayBuffer, remoteInfo: socket.SocketRemoteInfo} ) = > {
let str = [...new Uint8Array(value.message)]
.map(x = > x.toString(16).padStart(2, '0'))
.join(' ')
console.log(str)
let index: number = 0
let uint8Array = new Uint8Array(value.message)
let cfh = uint8Array[index]
index++
//3.2.1 CONNACK Fixed Header
//解析MQTT ACK数据,转换为日志输出
if(cfh == 32){
console.log('Fixed Header format:CONNACK('+cfh+')')
MQTTConnectACK.parse(index, uint8Array)
}
getContext().eventHub.emit('remotemsg', str)
})
复制
import MQTTTool from './MQTTTool'
export default class MQTTConnectACK{
public static parse(index: number, uint8Array: Uint8Array) {
let remainLength = uint8Array[index]
console.log('Remaining Length:' + remainLength.toString(16))
index++
if(remainLength == 0){
return
}
let remainIndex: number = 0
//3.2.2 CONNACK Variable Header
//3.2.2.1 Connect Acknowledge Flags
let caf = uint8Array[index]
console.log('Connect Acknowledge Flags:' + caf.toString(16))
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
//3.2.2.2 Connect Reason Code
let crc = uint8Array[index]
let des: string = ''
if(crc == 0){
des = 'Success'
} else if(crc == 128){
des = 'Unspecified error'
} else if(crc == 129){
des = 'Malformed Packet'
} else if(crc == 130){
des = 'Protocol Error'
} else if(crc == 131){
des = 'Implementation specific error'
} else if(crc == 132){
des = 'Unsupported Protocol Version'
} else if(crc == 133){
des = 'Client Identifier not valid'
} else if(crc == 134){
des = 'Bad User Name or Password'
} else if(crc == 135){
des = 'Not authorized'
} else if(crc == 136){
des = 'Server unavailable'
} else if(crc == 137){
des = 'Server busy'
} else if(crc == 138){
des = 'Banned'
} else if(crc == 140){
des = 'Bad authentication method'
} else if(crc == 144){
des = 'Topic Name invalid'
} else if(crc == 149){
des = 'Packet too large'
} else if(crc == 151){
des = 'Quota exceeded'
} else if(crc == 153){
des = 'Payload format invalid'
} else if(crc == 154){
des = 'Retain not supported'
} else if(crc == 155){
des = 'QoS not supported'
} else if(crc == 156){
des = 'Use another server'
} else if(crc == 157){
des = 'Server moved'
} else if(crc == 158){
des = 'Connection rate exceeded'
}
console.log('Connect Reason Code:' + des)
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
//3.2.2.3 CONNACK Properties
//3.2.2.3.1 Property Length
let propertyLength = uint8Array[index]
console.log('Property Length:' + propertyLength.toString(16))
index++
remainIndex++
if(propertyLength != 0){
while (true){
//判断类型
let nextType = uint8Array[index]
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
if(nextType == 17){ //值为4个字节
//3.2.2.3.2 Session Expiry Interval
let costByteNumber = MQTTTool.parseFourByte(uint8Array, index, "Session Expiry Interval:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 33){//值为2个字节
//3.2.2.3.3 Receive Maximum
let costByteNumber = MQTTTool.parseTwoByte(uint8Array, index, "Receive Maximum:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 36){ //值为1个字节
//3.2.2.3.4 Maximum QoS
let mq = uint8Array[index]
console.log('Maximum QoS:' + mq.toString(16))
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
} else if(nextType == 37) { //值为1个字节
//3.2.2.3.5 Retain Available
let ra = uint8Array[index]
console.log('Retain Available:' + ra.toString(16))
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
} else if(nextType == 39) { //值为4个字节
//3.2.2.3.6 Maximum Packet Size
let costByteNumber = MQTTTool.parseFourByte(uint8Array, index, "Maximum Packet Size:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 18) { //UTF-8 String = 2个字节 + 2个字节值的字节
//3.2.2.3.7 Assigned Client Identifier
let costByteNumber = MQTTTool.parseUTF8String(uint8Array, index, "Assigned Client Identifier:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 34) { // 值为2个字节
//3.2.2.3.8 Topic Alias Maximum
let costByteNumber = MQTTTool.parseTwoByte(uint8Array, index, "Topic Alias Maximum:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 31) { //UTF-8 String = 2个字节 + 2个字节值的字节
//3.2.2.3.9 Reason String
let costByteNumber = MQTTTool.parseUTF8String(uint8Array, index, "Reason String:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 38) {//UTF-8 String Pair = (2个字节 + 2个字节值的字节)+(2个字节 + 2个字节值的字节)
//3.2.2.3.10 User Property
let costByteNumber = MQTTTool.parseUTF8String(uint8Array, index, "User Property:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 40) { //值为1个字节
//3.2.2.3.11 Wildcard Subscription Available
let wsa = uint8Array[index]
console.log('Wildcard Subscription Available:' + wsa.toString(16))
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
} else if(nextType == 41) { //值为1个字节
//3.2.2.3.12 Subscription Identifiers Available
let sia = uint8Array[index]
console.log('Subscription Identifiers Available:' + sia.toString(16))
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
} else if(nextType == 42) { //值为1个字节
//3.2.2.3.13 Shared Subscription Available
let ssa = uint8Array[index]
console.log('Shared Subscription Available:' + ssa.toString(16))
index++
remainIndex++
if(remainIndex >= remainLength){
return
}
} else if(nextType == 19) { //值为2个字节
//3.2.2.3.14 Server Keep Alive
let costByteNumber = MQTTTool.parseTwoByte(uint8Array, index, "Server Keep Alive:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 26) { //UTF-8 String = 2个字节 + 2个字节值的字节
//3.2.2.3.15 Response Information
let costByteNumber = MQTTTool.parseUTF8String(uint8Array, index, "Response Information:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 28) { //UTF-8 String = 2个字节 + 2个字节值的字节
//3.2.2.3.16 Server Reference
let costByteNumber = MQTTTool.parseUTF8String(uint8Array, index, "Server Reference:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 21) { //UTF-8 String = 2个字节 + 2个字节值的字节
//3.2.2.3.17 Authentication Method
let costByteNumber = MQTTTool.parseUTF8String(uint8Array, index, "Authentication Method:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
} else if(nextType == 22) { //Binary Data = 2个字节 + 2个字节值的字节
//3.2.2.3.18 Authentication Data
let costByteNumber = MQTTTool.parseBinaryData(uint8Array, index, "Authentication Data:")
index += costByteNumber
remainIndex += costByteNumber
if(remainIndex >= remainLength){
return
}
}
}
}
}
}
复制
全部0条评论
快来发表一下你的评论吧 !