Boost.Asio C++ 網(wǎng)絡(luò)編程之八:基于TCP的同步服務(wù)端
掃描二維碼
隨時(shí)隨地手機(jī)看文章
? ? ? ?同步服務(wù)端也是相當(dāng)簡(jiǎn)單的。它只需要兩個(gè)線程,一個(gè)負(fù)責(zé)監(jiān)聽(tīng)新的客戶端連接,另外一個(gè)負(fù)責(zé)處理已經(jīng)存在的客戶端請(qǐng)求。它不能使用單線程,因?yàn)榈却碌目蛻舳诉B接是一個(gè)阻塞操作(因?yàn)閍ccept()是阻塞的),因此我們需要另外一個(gè)線程來(lái)處理已經(jīng)存在的客戶端請(qǐng)求。
基于TCP的同步服務(wù)端
1.流程圖
2.實(shí)現(xiàn)
#ifdef?WIN32 #define?_WIN32_WINNT?0x0501 #include#endif #include#include#include#include#includeusing?namespace?boost::asio; using?namespace?boost::posix_time; io_service?service; struct?talk_to_client; typedef?boost::shared_ptrclient_ptr; typedef?std::vectorarray; array?clients; //?thread-safe?access?to?clients?array boost::recursive_mutex?cs; void?update_clients_changed(); /**?simple?connection?to?server: -?logs?in?just?with?username?(no?password) -?all?connections?are?initiated?by?the?client:?client?asks,?server?answers -?server?disconnects?any?client?that?hasn't?pinged?for?5?seconds Possible?requests: -?gets?a?list?of?all?connected?clients -?ping:?the?server?answers?either?with?"ping?ok"?or?"ping?client_list_changed" */ struct?talk_to_client?:?boost::enable_shared_from_this{ talk_to_client() :?sock_(service),?started_(false),?already_read_(0)?{ last_ping?=?microsec_clock::local_time(); } std::string?username()?const?{?return?username_;?} void?answer_to_client()?{ try?{ read_request(); process_request(); } catch?(boost::system::system_error&)?{ stop(); } if?(timed_out())?{ stop(); std::cout?<<?"stopping?"?<<?username_?<<?"?-?no?ping?in?time"?<<?std::endl; } } void?set_clients_changed()?{?clients_changed_?=?true;?} ip::tcp::socket?&?sock()?{?return?sock_;?} bool?timed_out()?const?{ ptime?now?=?microsec_clock::local_time(); long?long?ms?=?(now?-?last_ping).total_milliseconds(); return?ms?>?5000; } void?stop()?{ //?close?client?connection boost::system::error_code?err; sock_.close(err); } private: void?read_request()?{ if?(sock_.available()) already_read_?+=?sock_.read_some( buffer(buff_?+?already_read_,?max_msg?-?already_read_)); } void?process_request()?{ bool?found_enter?=?std::find(buff_,?buff_?+?already_read_,?'n') <?buff_?+?already_read_; if?(!found_enter) return;?//?message?is?not?full //?process?the?msg last_ping?=?microsec_clock::local_time(); size_t?pos?=?std::find(buff_,?buff_?+?already_read_,?'n')?-?buff_; std::string?msg(buff_,?pos); std::copy(buff_?+?already_read_,?buff_?+?max_msg,?buff_); already_read_?-=?pos?+?1; if?(msg.find("login?")?==?0)?on_login(msg); else?if?(msg.find("ping")?==?0)?on_ping(); else?if?(msg.find("ask_clients")?==?0)?on_clients(); else?std::cerr?<<?"invalid?msg?"?<<?msg?<<?std::endl; } void?on_login(const?std::string?&?msg)?{ std::istringstream?in(msg); in?>>?username_?>>?username_; std::cout?<<?username_?<<?"?logged?in"?<<?std::endl; write("login?okn"); update_clients_changed(); } void?on_ping()?{ write(clients_changed_???"ping?client_list_changedn"?:?"ping?okn"); clients_changed_?=?false; } void?on_clients()?{ std::string?msg; {?boost::recursive_mutex::scoped_lock?lk(cs); for?(array::const_iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b) msg?+=?(*b)->username()?+?"?"; } write("clients?"?+?msg?+?"n"); } void?write(const?std::string?&?msg)?{ sock_.write_some(buffer(msg)); } private: ip::tcp::socket?sock_; enum?{?max_msg?=?1024?}; int?already_read_; char?buff_[max_msg]; bool?started_; std::string?username_; bool?clients_changed_; ptime?last_ping; }; void?update_clients_changed()?{ boost::recursive_mutex::scoped_lock?lk(cs); for?(array::iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b) (*b)->set_clients_changed(); } void?accept_thread()?{ ip::tcp::acceptor?acceptor(service,?ip::tcp::endpoint(ip::tcp::v4(),?8001)); while?(true)?{ client_ptr?new_(new?talk_to_client); acceptor.accept(new_->sock()); boost::recursive_mutex::scoped_lock?lk(cs); clients.push_back(new_); } } void?handle_clients_thread()?{ while?(true)?{ boost::this_thread::sleep(millisec(1)); boost::recursive_mutex::scoped_lock?lk(cs); for?(array::iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b) (*b)->answer_to_client(); //?erase?clients?that?timed?out clients.erase(std::remove_if(clients.begin(),?clients.end(), boost::bind(&talk_to_client::timed_out,?_1)),?clients.end()); } } int?main(int?argc,?char*?argv[])?{ boost::thread_group?threads; threads.create_thread(accept_thread); threads.create_thread(handle_clients_thread); threads.join_all(); }
? ? ? ?在accept_thread中會(huì)循環(huán)接受客戶端的鏈接,因?yàn)閏lients容器中的元素在兩個(gè)線程中都要訪問(wèn),所以需要加鎖進(jìn)行同步。
? ? ? ?在handle_clients_thread線程中會(huì)處理和各客戶端的消息會(huì)話,并且把掉線的客戶端從clients容器中刪除。這里用到了std::remove_if,它通常配合std::vector::erase使用。std::remove_if定義于頭文件
templateForwardIterator?remove_if?(ForwardIterator?first,?ForwardIterator?last,UnaryPredicate?pred);
? ? ? ?函數(shù)remove_if()移除序列[first, last)中所有應(yīng)用于謂詞predict返回true的元素。
? ? ? ?remove_if()并不會(huì)實(shí)際移除序列[first, last)中的元素;如果在一個(gè)容器上應(yīng)用remove_if(), 容器的長(zhǎng)度并不會(huì)改變(remove_if()不可能僅通過(guò)迭代器改變?nèi)萜鞯膶傩?, 所有的元素都還在容器里面。實(shí)際做法是, remove_if()將所有應(yīng)該移除的元素都移動(dòng)到了容器尾部并返回一個(gè)分界的迭代器, 移除的所有元素仍然可以通過(guò)返回的迭代器訪問(wèn)到。為了實(shí)際移除元素, 你必須對(duì)容器自行調(diào)用erase()以擦除需要移除的元素。
? ? ? ?下面是std::remove_if的一個(gè)例子:
#include#includebool?IsOdd(int?i)?{?return?((i?%?2)?==?1);?} int?main()?{ int?myints[]?=?{?1,?2,?3,?4,?5,?6,?7,?8,?9?};??????? int*?pbegin?=?myints;?????????????????????????? int*?pend?=?myints?+?sizeof(myints)?/?sizeof(int);?????????????? pend?=?std::remove_if(pbegin,?pend,?IsOdd);?//?將符合要求的元素都移動(dòng)到尾部 //?^???????^ std::cout?<<?"the?range?contains:";?????????//?輸出:the?range?contains:?2?4?6?8 for?(int*?p?=?pbegin;?p?!=?pend;?++p) std::cout?<<?'?'?<<?*p; std::cout?<<?'n'; system("pause"); return?0; }