久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Linux中常見同步機制設計原理

 蘭寶888 2018-07-17

引言

今天談談linux中常見并發(fā)訪問的保護機制設計原理。為什么要寫這篇文章呢,?其實想幫助自己及讀者更深入的了解背后的原理(據可靠消息,,鎖的實現經常出現在筆試環(huán)節(jié)。既可以考察面試者對鎖的原理的理解,,又可以考察面試者編程技能),。我們拋開linux中匯編代碼,。用C語言為大家呈現背后實現的原理。同時,,文章中的代碼都沒有考慮并發(fā)情況(例如某些操作需要原子性,,或者數據需要保護等)。

注:部分代碼都是根據ARM64架構匯編代碼翻譯成C語言并經過精簡(例如:spin lock,、read-write lock),。也有部分代碼實現是為了呈現背后設計的原理自己編寫的,而不是精簡linux中實現的代碼(例如mutex),。

自旋鎖(spin lock)

自旋鎖是linux中使用非常頻繁的鎖,,原理簡單。當進程A申請鎖成功后,,進程B申請鎖就會失敗,,但是不會調度,原地自旋,。就在原地轉到天昏地暗只為等到進程A釋放鎖,。由于不會睡眠和調度的特性,在中斷上下文中,,數據的保護一般都是選擇自旋鎖,。如果有多個進程去申請鎖。當第一個申請鎖成功的線程在釋放的時候,,其他進程是競爭的關系,。因此是一種不公平。所以現在的linux采用的是排隊機制,。先到先得,。誰先申請,誰就先得到鎖,。

原理

舉個例子,,大家應該都去過銀行辦業(yè)務吧。銀行的辦事大廳一般會有幾個窗口同步進行,。今天很不巧,,只有一個窗口提供服務。現在的銀行服務都是采用取號排隊,,叫號服務的方式,。當你去銀行辦理業(yè)務的時候,首先會去取號機器領取小票,,上面寫著你排多少號,。然后你就可以排隊等待了。一般還會有個顯示屏,上面會顯示一個數字(例如:"請xxx號到1號窗口辦理"),,代表當前可以被服務顧客的排隊號碼,。每辦理完一個顧客的業(yè)務,顯示屏上面的數字都會增加1,。等待的顧客都會對比自己手上寫的編號和顯示屏上面是否一致,,如果一致的話,就可以去前臺辦理業(yè)務了?,F在早上剛開業(yè),,顧客A是今天的第一個顧客,去取號機器領取0號(next計數)小票,,然后看到顯示屏上顯示0(owner計數),,顧客A就知道現在輪到自己辦理業(yè)務了。顧客A到前臺辦理業(yè)務(持有鎖)中,,顧客B來了,。同樣,顧客B去取號機器拿到1號(next計數)小票,。然后乖乖的坐在旁邊等候,。顧客A依然在辦理業(yè)務中,此時顧客C也來了,。顧客C去取號機器拿到2號(next計數)小票,。顧客C也乖乖的找個座位繼續(xù)等待。終于,,顧客A的業(yè)務辦完了(釋放鎖),。然后,顯示屏上面顯示1(owner計數),。顧客B和C都對比顯示屏上面的數字和自己手中小票的數字是否相等,。顧客B終于可以辦理業(yè)務了(持有鎖)。顧客C依然等待中,。顧客B的業(yè)務辦完了(釋放鎖),。然后,顯示屏上面顯示2(owner計數),。顧客C終于開始辦理業(yè)務(持有鎖),。顧客C的業(yè)務辦完了(釋放鎖)。3個顧客都辦完了業(yè)務離開了,。只留下一個銀行柜臺服務員,。最終,顯示屏上面顯示3(owner計數),。取號機器的下一個排隊號也是3號(next計數),。無人辦理業(yè)務(鎖是釋放狀態(tài)),。

流程圖.png

linux中針對每一個spin lock會有兩個計數,。分別是next和owner(初始值為0),。進程A申請鎖時,會判斷next和owner的值是否相等,。如果相等就代表鎖可以申請成功,,否則原地自旋。直到owner和next的值相等才會退出自旋,。假設進程A申請鎖成功,,然后會next加1。此時owner值為0,,next值為1,。進程B也申請鎖,保存next得值到局部變量tmp(tmp = 1)中,。由于next和owner值不相等,,因此原地自旋讀取owner的值,判斷owner和tmp是否相等,,直到相等退出自旋狀態(tài),。當然next的值還是加1,變成2,。進程A釋放鎖,,此時會將owner的值加1,那么此時B進程的owner和tmp的值都是1,,因此B進程獲得鎖,。當B進程釋放鎖后,同樣會將owner的值加1。最后owner和next都等于2,代表沒有進程持有鎖,。next就是一個記錄申請鎖的次數,,而owner是持有鎖進程的計數值。

實現

我們首先定義描述自旋鎖的結構體arch_spinlock_t,。

  1. typedef struct {
  2. union {
  3. unsigned int slock;
  4. struct __raw_tickets {
  5. unsigned short owner;
  6. unsigned short next;
  7. } tickets;
  8. };
  9. } arch_spinlock_t;

如上面的原理描述,我們需要兩個計數,分別是owner和next,。slock所占內存區(qū)域覆蓋owner和next(據說C語言學好的都能看得懂)。下面實現申請鎖操作 arch_spin_lock,。

  1. static inline void arch_spin_lock(arch_spinlock_t *lock)
  2. {
  3. arch_spinlock_t old_lock;
  4.  
  5. old_lock.slock = lock->slock; /* 1 */
  6. lock->tickets.next++; /* 2 */
  7. while (old_lock.tickets.next != old_lock.tickets.owner) { /* 3 */
  8. wfe(); /* 4 */
  9. old_lock.tickets.owner = lock->tickets.owner; /* 5 */
  10. }
  11. }
  1. 繼續(xù)上面的舉例,。顧客從取號機器得到排隊號。
  2. 取號機器更新下個顧客將要拿到的排隊號,。
  3. 看一下顯示屏,,判斷是否輪到自己了。
  4. wfe()函數是指ARM64架構的WFE(wait for event)匯編指令。WFE是讓ARM核進入低功耗模式的指令,。當進程拿不到鎖的時候,,原地自旋不如cpu睡眠。節(jié)能,。睡下去之后,,什么時候醒來呢?就是等到持有鎖的進程釋放的時候,,醒過來判斷是否可以持有鎖,。如果不能獲得鎖,繼續(xù)睡眠即可,。這里就相當于顧客先小憩一會,,等到廣播下一位排隊者的時候,醒來看看是不是自己,。
  5. 前臺已經為上一個顧客辦理完成業(yè)務,,剩下排隊的顧客都要抬頭看一下顯示屏是不是輪到自己了。

釋放鎖的操作就非常簡單了,。還記得上面銀行辦理業(yè)務的例子嗎,?釋放鎖的操作僅僅是顯示屏上面的排隊號加1。我們僅僅需要將owner計數加1即可,。arch_spin_unlock實現如下,。

  1. static inline void arch_spin_unlock(arch_spinlock_t *lock)
  2. {
  3. lock->tickets.owner++;
  4. sev();
  5. }

sev()函數是指ARM64架構的SEV匯編指令。當進程無法獲取鎖的時候會使用WFE指令使CPU睡眠?,F在釋放鎖了,,自然要喚醒所有睡眠的CPU醒來檢查自己是不是可以獲取鎖。

信號量(semaphore)

信號量(semaphore)是進程間通信處理同步互斥的機制,。是在多線程環(huán)境下使用的一種措施,,它負責協調各個進程,以保證他們能夠正確,、合理的使用公共資源,。 它和spin lock最大的不同之處就是:無法獲取信號量的進程可以睡眠,因此會導致系統調度,。

原理

信號量一般可以用來標記可用資源的個數,。老規(guī)矩,還是舉個例子,。假設圖書館有2本《C語言從入門到放棄》書籍,。A同學想學C語言,于是發(fā)現這本書特別的好,。于是就去學校的圖書館借書,,A同學成功的從圖書館借走一本,。這時,A同學室友B同學發(fā)現A同學竟然在偷偷的學習武功秘籍(C語言),。于是,,B同學也去借一本。此時,,圖書館已經沒有書了,。C同學也想借這本書,,可能是這本書太火了,。圖書館管理員告訴C同學,圖書館這本書都被借走了,。如果有同學換回來,,會第一時間通知你。于是,,管理員就把C同學的信息登記先來,,以備后續(xù)通知C同學來借書。所以,,C同學只能悲傷的走了(如果是自旋鎖的原理的話,,那么C同學將會端個小板凳坐在圖書館,一直要等到A同學或者B同學還書并借走),。

實現

為了記錄可用資源的數量,,我們肯定需要一個count計數,標記當前可用資源數量,。當然還要一個可以像圖書管理員一樣的筆記本功能,。用來記錄等待借書的同學。所以,,一個雙向鏈表即可,。因此只需要一個count計數和等待進程的鏈表頭即可。描述信號量的結構體如下,。

  1. struct semaphore {
  2. unsigned int count;
  3. struct list_head wait_list;
  4. };

在linux中,,每個進程就相當于是每個借書的同學。通知一個同學,,就相當于喚醒這個進程,。因此,我們還需要一個結構體記錄當前的進程信息(task_struct),。

  1. struct semaphore_waiter {
  2. struct list_head list;
  3. struct task_struct *task;
  4. };

struct semaphore_waiter的list成員是當進程無法獲取信號量的時候掛入semaphore的wait_list成員,。task成員就是記錄后續(xù)被喚醒的進程信息。

一切準備就緒,,現在就可以實現信號量的申請函數,。

  1. void down(struct semaphore *sem)
  2. {
  3. struct semaphore_waiter waiter;
  4.  
  5. if (sem->count > 0) {
  6. sem->count--; /* 1 */
  7. return;
  8. }
  9.  
  10. waiter.task = current; /* 2 */
  11. list_add_tail(&waiter.list, &sem->wait_list); /* 2 */
  12. schedule(); /* 3 */
  13. }
  1. 如果信號量標記的資源還有剩余,,自然可以成功獲取信號量。只需要遞減可用資源計數,。
  2. 既然無法獲取信號量,,就需要將當前進程掛入信號量的等待隊列鏈表上。
  3. schedule()主要是觸發(fā)任務調度的示意函數,,主動讓出CPU使用權,。在讓出之前,需要將當前進程從運行隊列上移除,。

釋放信號的實現也是比較簡單,。實現如下。

  1. void up(struct semaphore *sem)
  2. {
  3. struct semaphore_waiter waiter;
  4.  
  5. if (list_empty(&sem->wait_list)) {
  6. sem->count++; /* 1 */
  7. return;
  8. }
  9.  
  10. waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list);
  11. list_del(&waiter->list); /* 2 */
  12. wake_up_process(waiter->task); /* 2 */
  13. }
  1. 如果等待鏈表沒有進程,,那么自然只需要增加資源計數,。
  2. 從等待進程鏈表頭取出第一個進程,并從鏈表上移除,。然后就是喚醒該進程,。

讀寫鎖(read-write lock)

不管是自旋鎖還是信號量在同一時間只能有一個進程進入臨界區(qū)。對于有些情況,,我們是可以區(qū)分讀寫操作的,。因此,我們希望對于讀操作的進程可以并發(fā)進行,。對于寫操作只限于一個進程進入臨界區(qū),。而這種同步機制就是讀寫鎖。讀寫鎖一般具有以下幾種性質,。
  • 同一時間有且僅有一個寫進程進入臨界區(qū),。
  • 在沒有寫進程進入臨界區(qū)的時候,同時可以有多個讀進程進入臨界區(qū),。
  • 讀進程和寫進程不可以同時進入臨界區(qū),。

讀寫鎖有兩種,一種是信號量類型,,另一種是spin lock類型,。下面以spin lock類型講解。

原理

老規(guī)矩,,還是舉個例子理解讀寫鎖,。我絞盡腦汁才想到一個比較貼切的例子。這個例子來源于生活,。我發(fā)現公司一般都會有保潔阿姨打掃廁所,。如果以男廁所為例的話,我覺得男士進入廁所就相當于讀者進入臨界區(qū),。因為可以有多個男士進廁所,。而保潔阿姨進入男士廁所就相當于寫者進入臨界區(qū),。假設A男士發(fā)現保潔阿姨不在打掃廁所,就進入廁所,。隨后B和C同時也進入廁所,。然后保潔阿姨準備打掃廁所,發(fā)現有男士在廁所里面,,因此只能在門口等待,。ABC都離開了廁所。保潔阿姨迅速進入廁所打掃,。然后D男士去上廁所,,發(fā)現保潔阿姨在里面?;伊锪锏某鰜砹嗽陂T口等著?,F在體會到了寫者(保潔阿姨)具有排他性,,讀者(男士)可以并發(fā)進入臨界區(qū)了吧,。

既然我們允許多個讀者進入臨界區(qū),因此我們需要一個計數統計讀者的個數,。同時,,由于寫者永遠只存在一個進入臨界區(qū),因此只需要一個bit標記是否有寫進程進入臨界區(qū),。所以,,我們可以將兩個計數合二為一。只需要1個unsigned int類型即可,。最高位(bit31)代表是否有寫者進入臨界區(qū),,低31位(0~30bit)統計讀者個數。


  1. +----+-------------------------------------------------+
  2. | 31 | 30 0 |
  3. +----+-------------------------------------------------+
  4. | |
  5. | +----> [0:30] Read Thread Counter
  6. +-------------------------> [31] Write Thread Counter

實現

描述讀寫鎖只需要1個變量即可,,因此我們可以定義讀寫鎖的結構體如下,。

  1. typedef struct {
  2. volatile unsigned int lock;
  3. } arch_rwlock_t;

既然區(qū)分讀寫操作,因此肯定會有兩個申請鎖函數,,分別是讀和寫,。首先,我們看一下read_lock操作的實現,。

  1. static inline void arch_read_lock(arch_rwlock_t *rw)
  2. {
  3. unsigned int tmp;
  4.  
  5. sevl(); /* 1 */
  6. do {
  7. wfe();
  8. tmp = rw->lock;
  9. tmp++; /* 2 */
  10. } while(tmp & (1 << 31)); /* 3 */
  11. rw->lock = tmp;
  12. }
  1. sevl()函數是ARM64架構中SEVL匯編指令,。SEVL和SEV的區(qū)別是,SEVL僅僅修改本地CPU的PE寄存器值,,這樣下面的WFE指令第一次執(zhí)行的時候不會睡眠,。
  2. 增加讀者計數,最后會更新到rw->lock中,。
  3. 更新rw->lock前提是沒有寫者,,因此這里會判斷是否有寫者已經進入臨界區(qū)(判斷方法是rw->lock變量bit31的值),。如果,有寫者已經進入臨界區(qū),,就在這里循環(huán),,并WFE指令睡眠。類似上面介紹的spin lock實現,。

當讀進程離開臨界區(qū)的時候會調用read_unlock釋放鎖,。read_unlock實現如下。

  1. static inline void arch_read_unlock(arch_rwlock_t *rw)
  2. {
  3. rw->lock--;
  4. sev();
  5. }

實現很簡單,,和spin_unlock如出一轍,。遞減讀者計數,然后使用SEV指令喚醒所有的CPU,,檢查等待狀態(tài)的進程是否可以獲取鎖,。

讀操作看完了,我們看看寫操作是如何實現的,。arch_write_lock實現如下,。

  1. static inline void arch_write_lock(arch_rwlock_t *rw)
  2. {
  3. unsigned int tmp;
  4.  
  5. sevl();
  6. do {
  7. wfe();
  8. tmp = rw->lock;
  9. } while(tmp); /* 1 */
  10. rw->lock = 1 << 31; /* 2 */
  11. }
  1. 由于寫者是排他的(讀者和寫者都不能有),因此這里只有rw->lock的值為0,,當前的寫者才可以進入臨界區(qū),。
  2. 置位rw->lock的bit31,代表有寫者進入臨界區(qū),。

當寫進程離開臨界區(qū)的時候會調用write_unlock釋放鎖,。write_unlock實現如下。

  1. static inline void arch_write_unlock(arch_rwlock_t *rw)
  2. {
  3. rw->lock = 0; /* 1 */
  4. sev(); /* 2 */
  5. }
  1. 同樣由于寫者是排他的,,因此只需要將rw->lock置0即可,。代表沒有任何進程進入臨界區(qū)。畢竟是因為同一時間只能有一個寫者進入臨界區(qū),,當這個寫者離開臨界區(qū)的時候,,肯定是意味著現在沒有任何進程進入臨界區(qū)。
  2. 使用SEV指令喚醒所有的CPU,,檢查等待狀態(tài)的進程是否可以獲取鎖,。

以上的代碼實現其實會導致寫進程餓死現象。例如,,A,、B、C三個進程進入讀臨界區(qū),,D進程嘗試獲得寫鎖,,此時只能等待A、B,、C三個進程退出臨界區(qū),。如果在推出之前又有F,、G進程進入讀臨界區(qū),那么將出現D進程餓死現象,。

互斥量(mutex)

前文提到的semaphore在初始化count計數的時候,,可以分為計數信號量和互斥信號量(二值信號量)。mutex和初始化計數為1的二值信號量有很大的相似之處,。他們都可以用做資源互斥,。但是mutex卻有一個特殊的地方:只有持鎖者才能解鎖。但是,,二值信號量卻可以在一個進程中獲取信號量,,在另一個進程中釋放信號量。如果是應用在嵌入式應用的RTOS,,針對mutex的實現還會考慮優(yōu)先級反轉問題,。

原理

既然mutex是一種二值信號量,因此就不需要像semaphore那樣需要一個count計數,。由于mutex具有“持鎖者才能解鎖”的特點,,所以我們需要一個變量owner記錄持鎖進程。釋放鎖的時候必須是同一個進程才能釋放,。當然也需要一個鏈表頭,,主要用來便利睡眠等待的進程。原理和semaphore及其相似,,因此在代碼上也有體現。

實現

mutex的實現代碼和linux中實現會有差異,,但是依然可以為你呈現設計的原理,。下面的設計代碼更像是部分RTOS中的代碼。mutex和semaphore一樣,,我們需要兩個類似的結構體分別描述mutex,。

  1. struct mutex_waiter {
  2. struct list_head list;
  3. struct task_struct *task;
  4. };
  5.  
  6. struct mutex {
  7. long owner;
  8. struct list_head wait_list;
  9. };

struct mutex_waiter的list成員是當進程無法獲取互斥量的時候掛入mutex的wait_list鏈表。

首先實現申請互斥量的函數,。

  1. void mutex_take(struct mutex *mutex)
  2. {
  3. struct mutex_waiter waiter;
  4.  
  5. if (!mutex->owner) {
  6. mutex->owner = (long)current; /* 1 */
  7. return;
  8. }
  9.  
  10. waiter.task = current;
  11. list_add_tail(&waiter.list, &mutex->wait_list); /* 2 */
  12. schedule(); /* 2 */
  13. }
  1. 當mutex->owner的值為0的時候,,代表沒有任何進程持有鎖。因此可以直接申請成功,。然后,,記錄當前申請鎖進程的task_struct。
  2. 既然不能獲取互斥量,,自然就需要睡眠等待,,掛入等待鏈表。

互斥量的釋放代碼實現也同樣和semaphore有很多相似之處,。不信,,你看,。

 
  1. int mutex_release(struct mutex *mutex)
  2. {
  3. struct mutex_waiter waiter;
  4.  
  5. if (mutex->owner != (long)current) /* 1 */
  6. return -1;
  7.  
  8. if (list_empty(&mutex->wait_list)) {
  9. mutex->owner = 0; /* 2 */
  10. return 0;
  11. }
  12.  
  13. waiter = list_first_entry(&mutex->wait_list, struct mutex_waiter, list);
  14. list_del(&waiter->list);
  15. mutex->owner = (long)waiter->task; /* 3 */
  16. wake_up_process(waiter->task); /* 4 */
  17.  
  18. return 0;
  19. }
  1. mutex具有“持鎖者才能解鎖”的特點就是在這行代碼體現。
  2. 如果等待鏈表沒有進程,,那么自然只需要將mutex->owner置0,,代表沒有鎖是釋放狀態(tài)。
  3. mutex->owner的值改成當前可以持鎖進程的task_struct,。
  4. 從等待進程鏈表取出第一個進程,,并從鏈表上移除。然后就是喚醒該進程,。

    本站是提供個人知識管理的網絡存儲空間,,所有內容均由用戶發(fā)布,不代表本站觀點,。請注意甄別內容中的聯系方式,、誘導購買等信息,謹防詐騙,。如發(fā)現有害或侵權內容,,請點擊一鍵舉報。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多