近期项目中需要将应用部署到私有化环境中,同时要与边缘网关的LE(Link IoT Edge)进行MQTT通信。本文对该通信过程做了简单梳理和总结。
Copy
一、环境准备
1、创建边缘实例;
2、在边缘实例中部署LE;
https://help.aliyun.com/document_detail/85155.html?spm=a2c4g.11186623.6.542.55b04353QpDySD
二、边缘应用连接LE的MQTT
1、客户端选择
MQTT是IBM公司牵头制定,设计目标是帮助嵌入式设备快速稳定的进行数据传输,协议本身是开源的,基于tcp协议构建。并且有大量的开源服务端和客户端实现。由于我们项目中用的是边缘网关的LE,所有MQTT的服务端实现不需要关注,这里直接选择一种合适的客户端实现Eclipse Paho。
Eclipse Paho
官方网站:http://www.eclipse.org/paho/,归属于eclipse基金会,提供了一个公开的mosquitto实现,地址:iot.eclipse.org, 端口 1883。主要提供了大量的客户端实现类库,由于使用方便,本项目中就采用的这个客户端实现了,相应的依赖包如下:
<!--paho mqtt client-->
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.1
true
2、MQTT的连接
这里直接贴出一个简略的mqtt连接的demo实现:
public static void main(String[] args) {
try {
String brokerUrl = "ssl://XXXXXXX:1883";
String clientId = "XXX_ClientId";
MemoryPersistence memoryPersistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(brokerUrl, clientId, memoryPersistence);
mqttClient.setTimeToWait(3000);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setKeepAliveInterval(90);
connOpts.setAutomaticReconnect(true);
PublishCallback publishCallback = new PublishCallback();
mqttClient.setCallback(publishCallback);
mqttClient.connect(connOpts);
} catch (Exception e) {
logger.error("connect to mqtt fail:", e);
}
}
值得注意的是,出于通信安全的考虑,往往会要求在mqtt传输通道加上SSL加密;这就要求在mqtt服务端颁发一个CA证书,并在MQTT配置文件中补充SSL加密的配置内容,同时将CA证书给到客户端做加密使用。MQTT客户端的SSL加密可参照如下:
public static SSLSocketFactory getSSLSocktet(final String caCrtFile) throws Exception {
// CA certificate is used to authenticate server
CertificateFactory cAf = CertificateFactory.getInstance("X.509");
InputStream caIn = Resources.class.getResourceAsStream(caCrtFile);
X509Certificate ca = (X509Certificate)cAf.generateCertificate(caIn);
KeyStore caKs = KeyStore.getInstance("JKS");
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", ca);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
tmf.init(caKs);
// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1");
context.init(null, tmf.getTrustManagers(), new SecureRandom());
return context.getSocketFactory();
}
得到的SSLSocketFactory 设置到MqttConnectOptions中,如下:
connOpts.setSocketFactory(SslUtil.getSSLSocktet("/UserProfile/ca/ca.crt"));
另,CA证书的制作需要和服务端充分沟通,配置的内容不匹配会衍生很多SSL握手上的异常,切记。
3、MQTT的消息推送和消费
MQTT客户端能正常连上服务端之后,消息推送和消费的事情就变得简单了。
//发送消息
MqttMessage reqMsg = new MqttMessage();
reqMsg.setQos(ChannelConstUtil.MQTT_QOS);
reqMsg.setPayload(message.getBytes());
mqttClient.publish(topic, reqMsg);
//订阅消息
String[] topic1 = {"/ssl"};
mqttClient.subscribe(topic1);
三、小结
MQTT是一种成熟的嵌入式设备快速稳定的进行数据传输的协议,这块资料网上也挺丰富,本文简要记录了项目中使用MQTT的情况,理解不到位处,请不吝赐教。