????:場(chǎng)景:一個(gè)多線程的C++程序,24h x 5.5d運(yùn)行。有幾個(gè)工作線程ThreadW{0,1,2,3},處理客戶發(fā)過(guò)來(lái)的交易請(qǐng)求,另外有一個(gè)背景線程ThreadB,不定期更新程序內(nèi)部的參考數(shù)據(jù)。這些線程都跟一個(gè)hash表打交道,工作線程只讀,背景線程讀寫(xiě),必然要用到一些同步機(jī)制,防止數(shù)據(jù)損壞。這里的示例代碼用std::map代替hash表,意思是一樣的:typedef map<string,
場(chǎng)景:
一個(gè)多線程的C++程序,24h x 5.5d運(yùn)行。有幾個(gè)工作線程ThreadW{0,1,2,3},處理客戶發(fā)過(guò)來(lái)的交易請(qǐng)求,另外有一個(gè)背景線程ThreadB,不定期更新程序內(nèi)部的參考數(shù)據(jù)。這些線程都跟一個(gè)hash表打交道,工作線程只讀,背景線程讀寫(xiě),必然要用到一些同步機(jī)制,防止數(shù)據(jù)損壞。
這里的示例代碼用std::map代替hash表,意思是一樣的:
typedef map<string, vector<pair<string, int> > > Map;
map 的 key 是用戶名,value 是一個(gè)vector,里邊存的是不同stock的最小交易間隔,vector已經(jīng)排好序,可以用二分查找。
我們的系統(tǒng)要求工作線程的延遲盡可能小,可以容忍背景線程的延遲略大。一天之內(nèi),背景線程對(duì)數(shù)據(jù)更新的次數(shù)屈指可數(shù),最多一小時(shí)一次,更新的數(shù)據(jù)來(lái)自于網(wǎng)絡(luò),所以對(duì)更新的及時(shí)性不敏感。Map的數(shù)據(jù)量也不大,大約一千多條數(shù)據(jù)。
最簡(jiǎn)單的同步辦法,用讀寫(xiě)鎖,工作線程加讀鎖,背景線程加寫(xiě)鎖。但是讀寫(xiě)鎖的開(kāi)銷比普通mutex要大,如果工作線程能用最普通的非重入Mutex實(shí)現(xiàn)同步,就不必用讀寫(xiě)鎖,性能較高。我們借助shared_ptr實(shí)現(xiàn)了這一點(diǎn):
class Mutex; class MutexLockGuard; class CustomerData { public: CustomerData() : data_(new Map) { } ~CustomerData(); int query(const string& customer, const string& stock) const { MapPtr data = getData(); // data 一旦拿到,就不再需要鎖了。取數(shù)據(jù)的時(shí)候只有g(shù)etData()內(nèi)部有鎖,多線程并發(fā)讀的性能很好。 // 假設(shè)用戶肯定存在 const EntryList& entries = (*data)[customer]; return findEntry(entries, stock); } void update(const string& customer, const EntryList& entries ); private: typedef vector<string, int> EntryList; typedef map<string, EntryList> Map; typedef tr1::shared_ptr<Map> MapPtr; static int findEntry(const EntryList& entries, const string& key) const { /* 用 lower_bound 在 entries 里找 key */ } MapPtr getData() const { MutexLockGuard lock(dataMutex_); return data_; } MapPtr data_; mutable Mutex dataMutex_; };
關(guān)鍵看CustomerData::update()怎么寫(xiě)。既然要更新數(shù)據(jù),那肯定得加鎖,如果這時(shí)候其他線程正在讀,那么不能在原來(lái)的數(shù)據(jù)上修改,得創(chuàng)建一個(gè)副本,在副本上修改,修改完了再替換。如果沒(méi)有用戶在讀,那么就能直接修改,節(jié)約一次拷貝。
void CustomerData::update(const string& customer, const EntryList& entries ) { MutexLockGuard lock(dataMutex_); if (!data_.unique()) { MapPtr newData(new Map(*data_)); data_.swap(newData); } assert(data_.unique()); (*data_)[customer] = entries; }
注意其中用了shared_ptr::unique()來(lái)判斷是不是有人在讀,如果有人在讀,那么我們不能直接修改,因?yàn)閝uery()并沒(méi)有全程加鎖,只在getData()內(nèi)部有鎖。shared_ptr::swap()把data_替換為新副本,而且我們還在鎖里,不會(huì)有別的線程來(lái)讀,可以放心地更新。
據(jù)我們測(cè)試,大多數(shù)情況下更新都是在原來(lái)數(shù)據(jù)上進(jìn)行的,拷貝的比例還不到1%,很高效。更準(zhǔn)確的說(shuō),這不是copy-on-write,而是copy-on-other-reading。
我們將來(lái)可能會(huì)采用無(wú)鎖數(shù)據(jù)結(jié)構(gòu),不過(guò)目前這個(gè)實(shí)現(xiàn)已經(jīng)非常好,滿足我們的要求。