當(dāng)前位置:首頁(yè) > 芯聞號(hào) > 充電吧
[導(dǎo)讀]? ? ? ?同步服務(wù)端也是相當(dāng)簡(jiǎn)單的。它只需要兩個(gè)線程,一個(gè)負(fù)責(zé)監(jiān)聽(tīng)新的客戶端連接,另外一個(gè)負(fù)責(zé)處理已經(jīng)存在的客戶端請(qǐng)求。它不能使用單線程,因?yàn)榈却碌目蛻舳诉B接是一個(gè)阻塞操作(因?yàn)閍ccept()

? ? ? ?同步服務(wù)端也是相當(dāng)簡(jiǎn)單的。它只需要兩個(gè)線程,一個(gè)負(fù)責(zé)監(jiān)聽(tīng)新的客戶端連接,另外一個(gè)負(fù)責(zé)處理已經(jīng)存在的客戶端請(qǐng)求。它不能使用單線程,因?yàn)榈却碌目蛻舳诉B接是一個(gè)阻塞操作(因?yàn)閍ccept()是阻塞的),因此我們需要另外一個(gè)線程來(lái)處理已經(jīng)存在的客戶端請(qǐng)求。

基于TCP的同步服務(wù)端

1.流程圖


2.實(shí)現(xiàn)

#ifdef?WIN32
#define?_WIN32_WINNT?0x0501
#include#endif

#include#include#include#include#includeusing?namespace?boost::asio;
using?namespace?boost::posix_time;
io_service?service;

struct?talk_to_client;
typedef?boost::shared_ptrclient_ptr;
typedef?std::vectorarray;
array?clients;
//?thread-safe?access?to?clients?array
boost::recursive_mutex?cs;

void?update_clients_changed();

/**?simple?connection?to?server:
-?logs?in?just?with?username?(no?password)
-?all?connections?are?initiated?by?the?client:?client?asks,?server?answers
-?server?disconnects?any?client?that?hasn't?pinged?for?5?seconds

Possible?requests:
-?gets?a?list?of?all?connected?clients
-?ping:?the?server?answers?either?with?"ping?ok"?or?"ping?client_list_changed"
*/
struct?talk_to_client?:?boost::enable_shared_from_this{
	talk_to_client()
		:?sock_(service),?started_(false),?already_read_(0)?{
		last_ping?=?microsec_clock::local_time();
	}
	std::string?username()?const?{?return?username_;?}

	void?answer_to_client()?{
		try?{
			read_request();
			process_request();
		}
		catch?(boost::system::system_error&)?{
			stop();
		}
		if?(timed_out())?{
			stop();
			std::cout?<<?"stopping?"?<<?username_?<<?"?-?no?ping?in?time"?<<?std::endl;
		}
	}
	void?set_clients_changed()?{?clients_changed_?=?true;?}
	ip::tcp::socket?&?sock()?{?return?sock_;?}
	bool?timed_out()?const?{
		ptime?now?=?microsec_clock::local_time();
		long?long?ms?=?(now?-?last_ping).total_milliseconds();
		return?ms?>?5000;
	}
	void?stop()?{
		//?close?client?connection
		boost::system::error_code?err;
		sock_.close(err);
	}
private:
	void?read_request()?{
		if?(sock_.available())
			already_read_?+=?sock_.read_some(
			buffer(buff_?+?already_read_,?max_msg?-?already_read_));
	}
	void?process_request()?{
		bool?found_enter?=?std::find(buff_,?buff_?+?already_read_,?'n')
			<?buff_?+?already_read_;
		if?(!found_enter)
			return;?//?message?is?not?full
		//?process?the?msg
		last_ping?=?microsec_clock::local_time();
		size_t?pos?=?std::find(buff_,?buff_?+?already_read_,?'n')?-?buff_;
		std::string?msg(buff_,?pos);
		std::copy(buff_?+?already_read_,?buff_?+?max_msg,?buff_);
		already_read_?-=?pos?+?1;

		if?(msg.find("login?")?==?0)?on_login(msg);
		else?if?(msg.find("ping")?==?0)?on_ping();
		else?if?(msg.find("ask_clients")?==?0)?on_clients();
		else?std::cerr?<<?"invalid?msg?"?<<?msg?<<?std::endl;
	}

	void?on_login(const?std::string?&?msg)?{
		std::istringstream?in(msg);
		in?>>?username_?>>?username_;
		std::cout?<<?username_?<<?"?logged?in"?<<?std::endl;
		write("login?okn");
		update_clients_changed();
	}
	void?on_ping()?{
		write(clients_changed_???"ping?client_list_changedn"?:?"ping?okn");
		clients_changed_?=?false;
	}
	void?on_clients()?{
		std::string?msg;
		{?boost::recursive_mutex::scoped_lock?lk(cs);
		for?(array::const_iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b)
			msg?+=?(*b)->username()?+?"?";
		}
		write("clients?"?+?msg?+?"n");
	}


	void?write(const?std::string?&?msg)?{
		sock_.write_some(buffer(msg));
	}
private:
	ip::tcp::socket?sock_;
	enum?{?max_msg?=?1024?};
	int?already_read_;
	char?buff_[max_msg];
	bool?started_;
	std::string?username_;
	bool?clients_changed_;
	ptime?last_ping;
};

void?update_clients_changed()?{
	boost::recursive_mutex::scoped_lock?lk(cs);
	for?(array::iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b)
		(*b)->set_clients_changed();
}

void?accept_thread()?{
	ip::tcp::acceptor?acceptor(service,?ip::tcp::endpoint(ip::tcp::v4(),?8001));
	while?(true)?{
		client_ptr?new_(new?talk_to_client);
		acceptor.accept(new_->sock());

		boost::recursive_mutex::scoped_lock?lk(cs);
		clients.push_back(new_);
	}
}

void?handle_clients_thread()?{
	while?(true)?{
		boost::this_thread::sleep(millisec(1));
		boost::recursive_mutex::scoped_lock?lk(cs);
		for?(array::iterator?b?=?clients.begin(),?e?=?clients.end();?b?!=?e;?++b)
			(*b)->answer_to_client();
		//?erase?clients?that?timed?out
		clients.erase(std::remove_if(clients.begin(),?clients.end(),
			boost::bind(&talk_to_client::timed_out,?_1)),?clients.end());
	}
}

int?main(int?argc,?char*?argv[])?{
	boost::thread_group?threads;
	threads.create_thread(accept_thread);
	threads.create_thread(handle_clients_thread);
	threads.join_all();
}

? ? ? ?在accept_thread中會(huì)循環(huán)接受客戶端的鏈接,因?yàn)閏lients容器中的元素在兩個(gè)線程中都要訪問(wèn),所以需要加鎖進(jìn)行同步。

? ? ? ?在handle_clients_thread線程中會(huì)處理和各客戶端的消息會(huì)話,并且把掉線的客戶端從clients容器中刪除。這里用到了std::remove_if,它通常配合std::vector::erase使用。std::remove_if定義于頭文件

templateForwardIterator?remove_if?(ForwardIterator?first,?ForwardIterator?last,UnaryPredicate?pred);

? ? ? ?函數(shù)remove_if()移除序列[first, last)中所有應(yīng)用于謂詞predict返回true的元素。
? ? ? ?remove_if()并不會(huì)實(shí)際移除序列[first, last)中的元素;如果在一個(gè)容器上應(yīng)用remove_if(), 容器的長(zhǎng)度并不會(huì)改變(remove_if()不可能僅通過(guò)迭代器改變?nèi)萜鞯膶傩?, 所有的元素都還在容器里面。實(shí)際做法是, remove_if()將所有應(yīng)該移除的元素都移動(dòng)到了容器尾部并返回一個(gè)分界的迭代器, 移除的所有元素仍然可以通過(guò)返回的迭代器訪問(wèn)到。為了實(shí)際移除元素, 你必須對(duì)容器自行調(diào)用erase()以擦除需要移除的元素。
? ? ? ?下面是std::remove_if的一個(gè)例子:

#include#includebool?IsOdd(int?i)?{?return?((i?%?2)?==?1);?}

int?main()?{
	int?myints[]?=?{?1,?2,?3,?4,?5,?6,?7,?8,?9?};???????

	int*?pbegin?=?myints;??????????????????????????
	int*?pend?=?myints?+?sizeof(myints)?/?sizeof(int);??????????????

	pend?=?std::remove_if(pbegin,?pend,?IsOdd);?//?將符合要求的元素都移動(dòng)到尾部
	//?^???????^
	std::cout?<<?"the?range?contains:";?????????//?輸出:the?range?contains:?2?4?6?8
	for?(int*?p?=?pbegin;?p?!=?pend;?++p)
		std::cout?<<?'?'?<<?*p;
	std::cout?<<?'n';
	system("pause");
	return?0;
}


本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專(zhuān)欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車(chē)的華為或?qū)⒋呱龈蟮莫?dú)角獸公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

倫敦2024年8月29日 /美通社/ -- 英國(guó)汽車(chē)技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車(chē)工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車(chē)。 SODA V工具的開(kāi)發(fā)耗時(shí)1.5...

關(guān)鍵字: 汽車(chē) 人工智能 智能驅(qū)動(dòng) BSP

北京2024年8月28日 /美通社/ -- 越來(lái)越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運(yùn)行,同時(shí)企業(yè)卻面臨越來(lái)越多業(yè)務(wù)中斷的風(fēng)險(xiǎn),如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報(bào)道,騰訊和網(wǎng)易近期正在縮減他們對(duì)日本游戲市場(chǎng)的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)開(kāi)幕式在貴陽(yáng)舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語(yǔ)權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機(jī) 衛(wèi)星通信

要點(diǎn): 有效應(yīng)對(duì)環(huán)境變化,經(jīng)營(yíng)業(yè)績(jī)穩(wěn)中有升 落實(shí)提質(zhì)增效舉措,毛利潤(rùn)率延續(xù)升勢(shì) 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長(zhǎng) 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競(jìng)爭(zhēng)力 堅(jiān)持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競(jìng)爭(zhēng)優(yōu)勢(shì)...

關(guān)鍵字: 通信 BSP 電信運(yùn)營(yíng)商 數(shù)字經(jīng)濟(jì)

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺(tái)與中國(guó)電影電視技術(shù)學(xué)會(huì)聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會(huì)上宣布正式成立。 活動(dòng)現(xiàn)場(chǎng) NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長(zhǎng)三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會(huì)上,軟通動(dòng)力信息技術(shù)(集團(tuán))股份有限公司(以下簡(jiǎn)稱"軟通動(dòng)力")與長(zhǎng)三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉
關(guān)閉