count = 4; //线程数 } public function onConnect($connection) { } public function onWorkerStart($worker) { define('HEARTBEAT_TIME', 1000); //异步处理 //发送 $server_sa = new ShOneanalysis(); Timer::add(1, function () use ($worker, $server_sa) { if (isset($worker->device_id_code)) { $device_id_code = $worker->device_id_code; foreach ($worker->connections as $connection) { $info = $server_sa->getsendqueuelist($device_id_code); $totalpage = $info['totalpage']; if ($totalpage > 0) { $size = $info['size']; for ($page = 1; $page <= $totalpage; $page++) { $list = $server_sa->getsendqueuelist($device_id_code, 1, $size); foreach ($list as $key => $value) { $connection->send($value['msg']); $server_sa->showfortest($value['msg']); $server_sa->sendmsgsuccess($value); } } } } } }); //解析 $server_sa->initanalysisShoneReceiveMsg(); Timer::add(10, function () use ($worker, $server_sa) { $page = 1; $size = 30; $list = $server_sa->getreceivequeuelist($page, $size); if ($list) { $list = (array) $list; foreach ($list as $key => $value) { try { $linedata = $server_sa->getline($value['msg']); //长度相等可解析 $content = $linedata['content_str']; $device_id_code = $linedata['device_id_code']; $fid = $server_sa->getfacilityid($device_id_code); $content = $server_sa->getcommandcontent($content); $result = $server_sa->saveshinfo($fid, $device_id_code, $content, $value['msg']); if ($result) { $command = $content[0]; $data = $server_sa->createbacksendmsg($fid, $device_id_code, $content, $command); } $server_sa->receivemsgsuccess($value); } catch (\Exception $e) { $msg = $e->getMessage(); echo $msg . "\n"; Log::write($msg, 'shouhuan'); } } } }); Timer::add(1, function () use ($worker) { $time_now = time(); foreach ($worker->connections as $connection) { // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间 if (empty($connection->lastMessageTime)) { $connection->lastMessageTime = $time_now; continue; } // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接 if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) { //$connection->close(); //设置对应设备下线 } } }); } public function onMessage($connection, $data) { //Log::write($data, 'shouhuan'); $server_sa = new ShOneanalysis(); $datas = (array) $server_sa->unpack($data); foreach ($datas as $key => $data) { $server_sa->showfortest($data); try { $result = $server_sa->savereceivequeue($data); if (empty($result)) { Log::write("手环接收信息队列,数据保存失败", 'shouhuan'); } $line = $server_sa->getline($data); $this->device_id_code = $line['device_id_code']; //$this->device_id_code = "358800006072996"; } catch (\Exception $e) { Log::write($data, 'shouhuan'); Log::write($e->getMessage(), 'shouhuan'); var_dump($e->getMessage()); } } } public function onClose($connection) { } }