count = 4; //线程数 } public function onConnect($connection) { } public function onWorkerStart($worker) { define('HEARTBEAT_TIME', 1000); //异步处理 //发送 $server_sa = new SouhuanAnalysis(); $server_sc = new ShouhuanCommand(); Timer::add(1, function () use ($worker, $server_sa) { if (isset($worker->device_id_code)) { $device_id_code = $worker->device_id_code; foreach ($worker->connections as $connection) { $info = $server_sa->getsendqueuelist($device_id_code); $totalpage = $info['totalpage']; if ($totalpage > 0) { $size = $info['size']; for ($page = 1; $page <= $totalpage; $page++) { $list = $server_sa->getsendqueuelist($device_id_code, 1, $size); $list = (array) $list; foreach ($list as $key => $value) { $connection->send($value['msg']); $server_sa->showfortest($value['msg']); $server_sa->sendmsgsuccess($value); } } } } } }); //解析 $server_sa->initanalysisShoneReceiveMsg(); Timer::add(10, function () use ($worker, $server_sa, $server_sc) { $page = 1; $size = 30; $list = $server_sa->getreceivequeuelist($page, $size); if ($list) { $list = (array) $list; foreach ($list as $key => $value) { try { $msg = $value['msg']; $linedata = $server_sa->getline($msg); if (!is_array($linedata)) { Log::write("不可解析_onWorkerStart:" . $msg, 'shouhuan'); $server_sa->receivemsgsuccess($value); continue; } //长度相等可解析 $content = $linedata['content_str']; $device_id_code = $linedata['device_id_code']; $command = $linedata['command']; $content = $linedata['content_arr']; $fid = $server_sa->getfacilityid($device_id_code); $result = $server_sa->saveshinfo($fid, $device_id_code, $linedata, $value); $config = $linedata['config']; if ($result && 'tsend' == $config['kind']) { $data = $server_sc->createsendmsg($fid, $device_id_code, $linedata); } $server_sa->receivemsgsuccess($value); } catch (\Exception $e) { $msg = $e->getMessage(); echo $msg . "\n"; Log::write($msg, 'shouhuan'); } } } }); //下线校验 间隔10秒 Timer::add(10, function () use ($worker, $server_sa) { $server_sa->setofflineinfo(); //校验离线 // $time_now = time(); // foreach ($worker->connections as $connection) { // //当前时间和更改时间超过1分钟 则视为断开连接 // // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间 // if (empty($connection->lastMessageTime)) { // $connection->lastMessageTime = $time_now; // continue; // } // // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接 // if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) { // //$connection->close(); // //设置对应设备下线 // } // } }); } public function onMessage($connection, $data) { Log::write($data, 'shouhuan'); $server_sa = new SouhuanAnalysis(); $datas = (array) $server_sa->unpack($data); foreach ($datas as $key => $data) { //测试 $server_sa->showfortest($data); try { $device_id_code = $server_sa->getline($data, 'device_id_code'); if (!$device_id_code) { Log::write('不可解析_onMessage:' . $data, 'shouhuan'); continue; } $result = $server_sa->savereceivequeue($data, $device_id_code); if (empty($result)) { Log::write("手环接收信息队列,数据保存失败", 'shouhuan'); } //设置上线 $server_sa->setonlineinfo($device_id_code); $this->device_id_code = $device_id_code; //测试 //$this->device_id_code = "358800006072996"; } catch (\Exception $e) { Log::write($data, 'shouhuan'); Log::write($e->getMessage(), 'shouhuan'); Log::write($e->getTraceAsString(), 'shouhuan'); var_dump($e->getMessage()); //var_dump($e->getTraceAsString()); } } } public function onClose($connection) { $msg = 'colse time' . date('Y-m-d H:i:s'); Log::write($msg, 'shouhuan'); } }