Subscribe.php 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Master\Framework\Library\Mqtt;
  4. use function Hyperf\Coroutine\co;
  5. class Subscribe{
  6. /**
  7. * MQTT 订阅
  8. * @param array $topic
  9. * @param $function
  10. * @return mixed
  11. */
  12. public function endlessLoop(array $topic,$function): mixed
  13. {
  14. $client = new MqttClient();
  15. $client->connect();
  16. // 订阅一个主题或者多个主题
  17. $client->subscribe($topic);
  18. //时间
  19. $timeSincePing = time();
  20. while (true){
  21. // 接收并处理消息
  22. $message = $client->recv();// 订阅消息
  23. if ($message && $message !== true) {
  24. // 携程处理
  25. co(function () use ($function,$client,$message,$topic){
  26. $function($client,$message,$topic);
  27. });
  28. }
  29. //心跳
  30. if ($timeSincePing <= (time() - $client->getKeepAlive())){
  31. if ($client->ping()) {
  32. $timeSincePing = time();
  33. }
  34. }
  35. }
  36. }
  37. }