12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- <?php
- declare(strict_types=1);
- namespace App\Master\Framework\Library\Mqtt;
- use function Hyperf\Coroutine\co;
- class Subscribe{
- /**
- * MQTT 订阅
- * @param array $topic
- * @param $function
- * @return mixed
- */
- public function endlessLoop(array $topic,$function): mixed
- {
- $client = new MqttClient();
- $client->connect();
- // 订阅一个主题或者多个主题
- $client->subscribe($topic);
- //时间
- $timeSincePing = time();
- while (true){
- // 接收并处理消息
- $message = $client->recv();// 订阅消息
- if ($message && $message !== true) {
- // 携程处理
- co(function () use ($function,$client,$message,$topic){
- $function($client,$message,$topic);
- });
- }
- //心跳
- if ($timeSincePing <= (time() - $client->getKeepAlive())){
- if ($client->ping()) {
- $timeSincePing = time();
- }
- }
- }
- }
- }
|