Subscribe.php 796 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Master\Framework\Library\Mqtt;
  4. use Hyperf\Coroutine\Coroutine;
  5. class Subscribe{
  6. /**
  7. * MQTT 订阅
  8. * @param array $topic
  9. * @param $function
  10. * @return mixed
  11. */
  12. public function endlessLoop(array $topic,$function): mixed
  13. {
  14. $cline = new MqttClient();
  15. // 订阅一个主题或者多个主题
  16. $cline->subscribe($topic);
  17. //时间
  18. $timeSincePing = time();
  19. while (true){
  20. $function($cline,$topic);
  21. //心跳
  22. if ($timeSincePing <= (time() - $cline->getKeepAlive())){
  23. if ($cline->ping()) {
  24. $timeSincePing = time();
  25. }
  26. }
  27. Coroutine::sleep(0.01);
  28. }
  29. }
  30. }