边缘应用连边缘网关的MQTT的实践案例


  近期项目中需要将应用部署到私有化环境中,同时要与边缘网关的LE(Link IoT Edge)进行MQTT通信。本文对该通信过程做了简单梳理和总结。

Copy

一、环境准备

1、创建边缘实例;

2、在边缘实例中部署LE;

https://help.aliyun.com/document_detail/85155.html?spm=a2c4g.11186623.6.542.55b04353QpDySD

二、边缘应用连接LE的MQTT

1、客户端选择

MQTT是IBM公司牵头制定,设计目标是帮助嵌入式设备快速稳定的进行数据传输,协议本身是开源的,基于tcp协议构建。并且有大量的开源服务端和客户端实现。由于我们项目中用的是边缘网关的LE,所有MQTT的服务端实现不需要关注,这里直接选择一种合适的客户端实现Eclipse Paho。

Eclipse Paho

官方网站:http://www.eclipse.org/paho/,归属于eclipse基金会,提供了一个公开的mosquitto实现,地址:iot.eclipse.org, 端口 1883。主要提供了大量的客户端实现类库,由于使用方便,本项目中就采用的这个客户端实现了,相应的依赖包如下:

<!--paho mqtt client-->


org.eclipse.paho

org.eclipse.paho.client.mqttv3

1.2.1

true

2、MQTT的连接

这里直接贴出一个简略的mqtt连接的demo实现:

public static void main(String[] args) {
    try {
        String brokerUrl = "ssl://XXXXXXX:1883";
        String clientId = "XXX_ClientId";
        MemoryPersistence memoryPersistence = new MemoryPersistence();
        MqttClient mqttClient = new MqttClient(brokerUrl, clientId, memoryPersistence);
        mqttClient.setTimeToWait(3000);

        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setKeepAliveInterval(90);
        connOpts.setAutomaticReconnect(true);
        PublishCallback publishCallback = new PublishCallback();
        mqttClient.setCallback(publishCallback);
        mqttClient.connect(connOpts);
    } catch (Exception e) {
        logger.error("connect to mqtt fail:", e);
    }
}


值得注意的是,出于通信安全的考虑,往往会要求在mqtt传输通道加上SSL加密;这就要求在mqtt服务端颁发一个CA证书,并在MQTT配置文件中补充SSL加密的配置内容,同时将CA证书给到客户端做加密使用。MQTT客户端的SSL加密可参照如下:

public static SSLSocketFactory getSSLSocktet(final String caCrtFile) throws Exception {

// CA certificate is used to authenticate server

CertificateFactory cAf = CertificateFactory.getInstance("X.509");

InputStream caIn = Resources.class.getResourceAsStream(caCrtFile);

X509Certificate ca = (X509Certificate)cAf.generateCertificate(caIn);

KeyStore caKs = KeyStore.getInstance("JKS");

caKs.load(null, null);

caKs.setCertificateEntry("ca-certificate", ca);

TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");

tmf.init(caKs);

    // finally, create SSL socket factory
    SSLContext context = SSLContext.getInstance("TLSv1");
    context.init(null, tmf.getTrustManagers(), new SecureRandom());

    return context.getSocketFactory();
}


得到的SSLSocketFactory 设置到MqttConnectOptions中,如下:

connOpts.setSocketFactory(SslUtil.getSSLSocktet("/UserProfile/ca/ca.crt"));

另,CA证书的制作需要和服务端充分沟通,配置的内容不匹配会衍生很多SSL握手上的异常,切记。

3、MQTT的消息推送和消费

MQTT客户端能正常连上服务端之后,消息推送和消费的事情就变得简单了。

//发送消息
MqttMessage reqMsg = new MqttMessage();
reqMsg.setQos(ChannelConstUtil.MQTT_QOS);
reqMsg.setPayload(message.getBytes());
mqttClient.publish(topic, reqMsg);

//订阅消息
String[] topic1 = {"/ssl"};
mqttClient.subscribe(topic1);


三、小结

MQTT是一种成熟的嵌入式设备快速稳定的进行数据传输的协议,这块资料网上也挺丰富,本文简要记录了项目中使用MQTT的情况,理解不到位处,请不吝赐教。


评论区
Rick ©2018