本文探讨物联网平台后端组件选择,架构设计与性能验证。
组件
- EMQX 集群 MQTT 设备接入,通常设备均通过类似 <设备型号>/<设备ID> 的预定义主题前缀收发消息
- MQTT 协议接入层,根据 <设备型号> 前缀消费 EMQX 消息,并且根据对应的物模型规则将 MQTT 消息转换为系统物模型消息,然后转发到消息上行 Kafka 消息队列;同时通过消费 Kafka 中的下行物模型指令,根据设备型号对应的物模型规则转换为 MQTT 消息,并发送到 EMQX 集群
- Kafka 设备上行消息由规则引擎消费,先保存到时序数据库 InfluxDB,更新 PostgreSQL 中设备状态存储,发送到实时状态 Kafka 消息队列(供API层订阅),然后根据场景规则的触发器判断并触发场景任务,场景任务执行通过 Asynq + Redis 支持
组件性能验证
EMQX 性能测试
EMQX 性能测试对测试客户端性能要求较高,并且由于客户端端口分配问题,每个网卡支持的客户端数量不能大于可分配端口数,因此使用阿里云ECS增加虚拟网卡的方式进行测试,在此使用 4 网卡,每网卡 50000 客户端连接的方式模拟。
sysctl -w net.ipv4.ip_local_port_range='10000 60999'
./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.93.254 -i 5
./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.93.255 -i 5
./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.94.1 -i 5
./bin/emqtt_bench pub -t t -h 172.18.93.253 -s 16 -q 0 -c 50000 --ifaddr 172.18.94.0 -i 5
初步测试在单机可支持 15 万连接,15 万 qos0 消息,超过后出现 connack_timeout 报错。考虑到 EMQX 与 MQTT 接入层的横向扩展较容易,评估成为性能瓶颈的可能性较低。
Kafka 性能测试
通常 Kafka 的性能瓶颈为磁盘 IO,docker-compose.yml 中对 Kafka 配置对标阿里云 4 核 8G,5 万 iops 实例类型:
blkio_config:
device_read_iops:
- path: /dev/nvme0n1
rate: 50000
device_write_iops:
- path: /dev/nvme0n1
rate: 50000
device_write_bps:
- path: /dev/nvme0n1
rate: '300m'
device_read_bps:
- path: /dev/nvme0n1
rate: '300m'
deploy:
resources:
limits:
cpus: "4"
memory: 8G
创建测试主题,并验证
./kafka-topics.sh --create --bootstrap-server localhost:29092 --topic ptest --partitions 6 --replication-factor 2
./kafka-console-producer.sh --broker-list localhost:29092 --topic ptest
./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic ptest
分别测试不限制吞吐量以及限制百万吞吐量下的性能数据:
./kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:29092 acks=1 --topic ptest --throughput -1 --record-size 100 --num-records 50000000
# 50000000 records sent, 2288643.749714 records/sec (218.26 MB/sec), 6.51 ms avg latency, 303.00 ms max latency, 0 ms 50th, 7 ms 95th, 194 ms 99th, 257 ms 99.9th.
./kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:29092 acks=1 --topic ptest --throughput 1000000 --record-size 100 --num-records 50000000
# 50000000 records sent, 999900.009999 records/sec (95.36 MB/sec), 0.72 ms avg latency, 597.00 ms max latency, 0 ms 50th, 1 ms 95th, 4 ms 99th, 43 ms 99.9th.
最大吞吐量超过200万每秒,在百万消息每秒的情况下,延时能满足业务需求。
测试消费性能:
./kafka-consumer-perf-test.sh --broker-list localhost:29092 --topic ptest --messages 50000000
# data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
# 4768.4112, 518.9260, 50000417, 5441333.8775, 3176, 6013, 793.0170, 8315386.1633