123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- <?php
- declare(strict_types=1);
- namespace App\Master\Framework\Library\Mqtt;
- use Hyperf\Config\Annotation\Value;
- use Simps\MQTT\Client;
- use Simps\MQTT\Config\ClientConfig;
- class MqttClient
- {
- #[Value("mqtt.client")]
- protected $config;
- protected Client $client;
- /**
- * @param bool $clean 清理会话,默认为 true
- * @param array $will 遗嘱消息,当客户端断线后 Broker 会自动发送遗嘱消息给其它客户端
- */
- public function connect(bool $clean = true, array $will = [])
- {
- $config = $this->config;
- $configObj = new ClientConfig($config['pool']);
- $this->client = new Client($config['host'], $config['port'], $configObj);
- $this->client->connect($clean,$will);
- }
- /**
- * 向某个主题发布一条消息
- *
- * @param string $topic 主题
- * @param mixed $data 内容
- * @param int $qos QoS 等级,默认 0
- * @param int $dup 重发标志,默认 0
- * @param int $retain retain 标记,默认 0
- * @param array $properties 属性,MQTT5 中需要,可选
- * @return array|bool
- */
- public function publish(string $topic, mixed $data, int $qos = 0, int $dup = 0, int $retain = 0, array $properties = [])
- {
- $message = is_array($data) ? json_encode($data) : $data;
- return $this->client->publish($topic, $message, $qos, $dup, $retain, $properties);
- }
- /**
- * 订阅一个主题或者多个主题
- *
- * @param array $topic $topics 的 key 是主题,值为 QoS 的数组,例如
- * @param array $properties 属性,MQTT5 中需要,可选
- * // MQTT 3.x
- * $topics = [
- * // 主题 => Qos
- * 'topic1' => 0,
- * 'topic2' => 1,
- * ];
- *
- * // MQTT 5.0
- * $topics = [
- * // 主题 => 选项
- * 'topic1' => [
- * 'qos' => 1,
- * 'no_local' => true,
- * 'retain_as_published' => true,
- * 'retain_handling' => 2,
- * ],
- * 'topic2' => [
- * 'qos' => 2,
- * 'no_local' => false,
- * 'retain_as_published' => true,
- * 'retain_handling' => 1,
- * ],
- * ];
- * @return array|bool
- */
- public function subscribe(array $topic, array $properties = [])
- {
- return $this->client->subscribe($topic, $properties);
- }
- /**
- * 接收消息
- *
- * @return array|true
- */
- public function recv()
- {
- return $this->client->recv();
- }
- /**
- * 发送消息
- *
- * @param array $data
- * @param $response
- * @return array|true
- */
- public function send(array $data, $response = false)
- {
- return $this->client->send($data,$response);
- }
- /**
- * 发送心跳包
- *
- * @return array|bool
- */
- public function ping()
- {
- return $this->client->ping();
- }
- /**
- * 心跳间隔
- * @return int
- */
- public function getKeepAlive()
- {
- return $this->client->getConfig()->getKeepAlive();
- }
- }
|