MqttClient.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Master\Framework\Library\Mqtt;
  4. use Hyperf\Config\Annotation\Value;
  5. use Simps\MQTT\Client;
  6. use Simps\MQTT\Config\ClientConfig;
  7. class MqttClient
  8. {
  9. #[Value("mqtt.client")]
  10. protected $config;
  11. protected Client $client;
  12. /**
  13. * @param bool $clean 清理会话,默认为 true
  14. * @param array $will 遗嘱消息,当客户端断线后 Broker 会自动发送遗嘱消息给其它客户端
  15. */
  16. public function connect(bool $clean = true, array $will = [])
  17. {
  18. $config = $this->config;
  19. $configObj = new ClientConfig($config['pool']);
  20. $this->client = new Client($config['host'], $config['port'], $configObj);
  21. $this->client->connect($clean,$will);
  22. }
  23. /**
  24. * 向某个主题发布一条消息
  25. *
  26. * @param string $topic 主题
  27. * @param mixed $data 内容
  28. * @param int $qos QoS 等级,默认 0
  29. * @param int $dup 重发标志,默认 0
  30. * @param int $retain retain 标记,默认 0
  31. * @param array $properties 属性,MQTT5 中需要,可选
  32. * @return array|bool
  33. */
  34. public function publish(string $topic, mixed $data, int $qos = 0, int $dup = 0, int $retain = 0, array $properties = [])
  35. {
  36. $message = is_array($data) ? json_encode($data) : $data;
  37. return $this->client->publish($topic, $message, $qos, $dup, $retain, $properties);
  38. }
  39. /**
  40. * 订阅一个主题或者多个主题
  41. *
  42. * @param array $topic $topics 的 key 是主题,值为 QoS 的数组,例如
  43. * @param array $properties 属性,MQTT5 中需要,可选
  44. * // MQTT 3.x
  45. * $topics = [
  46. * // 主题 => Qos
  47. * 'topic1' => 0,
  48. * 'topic2' => 1,
  49. * ];
  50. *
  51. * // MQTT 5.0
  52. * $topics = [
  53. * // 主题 => 选项
  54. * 'topic1' => [
  55. * 'qos' => 1,
  56. * 'no_local' => true,
  57. * 'retain_as_published' => true,
  58. * 'retain_handling' => 2,
  59. * ],
  60. * 'topic2' => [
  61. * 'qos' => 2,
  62. * 'no_local' => false,
  63. * 'retain_as_published' => true,
  64. * 'retain_handling' => 1,
  65. * ],
  66. * ];
  67. * @return array|bool
  68. */
  69. public function subscribe(array $topic, array $properties = [])
  70. {
  71. return $this->client->subscribe($topic, $properties);
  72. }
  73. /**
  74. * 接收消息
  75. *
  76. * @return array|true
  77. */
  78. public function recv()
  79. {
  80. return $this->client->recv();
  81. }
  82. /**
  83. * 发送消息
  84. *
  85. * @param array $data
  86. * @param $response
  87. * @return array|true
  88. */
  89. public function send(array $data, $response = false)
  90. {
  91. return $this->client->send($data,$response);
  92. }
  93. /**
  94. * 发送心跳包
  95. *
  96. * @return array|bool
  97. */
  98. public function ping()
  99. {
  100. return $this->client->ping();
  101. }
  102. /**
  103. * 心跳间隔
  104. * @return int
  105. */
  106. public function getKeepAlive()
  107. {
  108. return $this->client->getConfig()->getKeepAlive();
  109. }
  110. }