本文探讨物联网平台后端组件选择,架构设计与性能验证。

组件

  • EMQX 集群 MQTT 设备接入,通常设备均通过类似 <设备型号>/<设备ID> 的预定义主题前缀收发消息
  • MQTT 协议接入层,根据 <设备型号> 前缀消费 EMQX 消息,并且根据对应的物模型规则将 MQTT 消息转换为系统物模型消息,然后转发到消息上行 Kafka 消息队列;同时通过消费 Kafka 中的下行物模型指令,根据设备型号对应的物模型规则转换为 MQTT 消息,并发送到 EMQX 集群
  • Kafka 设备上行消息由规则引擎消费,先保存到时序数据库 InfluxDB,更新 PostgreSQL 中设备状态存储,发送到实时状态 Kafka 消息队列(供API层订阅),然后根据场景规则的触发器判断并触发场景任务,场景任务执行通过 Asynq + Redis 支持

组件性能验证

EMQX 性能测试

EMQX 性能测试对测试客户端性能要求较高,并且由于客户端端口分配问题,每个网卡支持的客户端数量不能大于可分配端口数,因此使用阿里云ECS增加虚拟网卡的方式进行测试,在此使用 4 网卡,每网卡 50000 客户端连接的方式模拟。

sysctl -w net.ipv4.ip_local_port_range='10000 60999'

./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.93.254 -i 5

./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.93.255 -i 5

./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.94.1 -i 5

./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.94.0 -i 5

初步测试在单机可支持 15 万连接,15 万 qos0 消息,超过后出现 connack_timeout 报错。考虑到 EMQX 与 MQTT 接入层的横向扩展较容易,评估成为性能瓶颈的可能性较低。

Kafka 性能测试

通常 Kafka 的性能瓶颈为磁盘 IO,docker-compose.yml 中对 Kafka 配置对标阿里云 4 核 8G,5 万 iops 实例类型:

    blkio_config:
      device_read_iops:
        - path: /dev/nvme0n1
          rate: 50000
      device_write_iops:
        - path: /dev/nvme0n1
          rate: 50000
      device_write_bps:
        - path: /dev/nvme0n1
          rate: '300m'
      device_read_bps:
        - path: /dev/nvme0n1
          rate: '300m'
    deploy:
      resources:
        limits:
          cpus: "4"
          memory: 8G

创建测试主题,并验证

./kafka-topics.sh --create --bootstrap-server localhost:29092 --topic ptest --partitions 6 --replication-factor 2
./kafka-console-producer.sh --broker-list localhost:29092 --topic ptest
./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic ptest

分别测试不限制吞吐量以及限制百万吞吐量下的性能数据:

./kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:29092 acks=1 --topic ptest --throughput -1 --record-size 100 --num-records 50000000
# 50000000 records sent, 2288643.749714 records/sec (218.26 MB/sec), 6.51 ms avg latency, 303.00 ms max latency, 0 ms 50th, 7 ms 95th, 194 ms 99th, 257 ms 99.9th.

./kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:29092 acks=1 --topic ptest --throughput 1000000 --record-size 100 --num-records 50000000

# 50000000 records sent, 999900.009999 records/sec (95.36 MB/sec), 0.72 ms avg latency, 597.00 ms max latency, 0 ms 50th, 1 ms 95th, 4 ms 99th, 43 ms 99.9th.

最大吞吐量超过200万每秒,在百万消息每秒的情况下,延时能满足业务需求。

测试消费性能:

./kafka-consumer-perf-test.sh --broker-list localhost:29092 --topic ptest --messages 50000000
# data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
# 4768.4112, 518.9260, 50000417, 5441333.8775, 3176, 6013, 793.0170, 8315386.1633