count = 4; //线程数 } public function onConnect($connection) { } public function onWorkerStart($worker) { define('HEARTBEAT_TIME', 1000); //异步处理 $server_sa = new ShOneanalysis(); Timer::add(1, function () use ($worker, $server_sa) { if (isset($worker->device_id_code)) { $device_id_code = $worker->device_id_code; foreach ($worker->connections as $connection) { $info = $server_sa->getsendqueuelist($device_id_code); $totalpage = $info['totalpage']; if ($totalpage > 0) { $size = $info['size']; for ($page = 1; $page <= $totalpage; $page++) { $list = $server_sa->getsendqueuelist($device_id_code, 1, $size); foreach ($list as $key => $value) { $connection->send(json_encode($value['msg'])); $server_sa->sendmsgsuccess($value); } } } } } }); Timer::add(1, function () use ($worker) { $time_now = time(); foreach ($worker->connections as $connection) { // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间 if (empty($connection->lastMessageTime)) { $connection->lastMessageTime = $time_now; continue; } // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接 if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) { //$connection->close(); //设置对应设备下线 } } }); } public function onMessage($connection, $data) { Log::write("ShTcp", 'shouhuan'); Log::write($data, 'shouhuan'); $server_sa = new ShOneanalysis(); $result = $server_sa->savereceivequeue($data); if (empty($result)) { Log::write("手环接收信息队列,数据保存失败", 'shouhuan'); } //$line = $server_sa->getline($data); //$this->device_id_code = $line['device_id_code']; $this->device_id_code = "358800006072996"; } public function onClose($connection) { } }