1. 鎖、CAS操作和無鎖隊列的實現(xiàn)

        共 6967字,需瀏覽 14分鐘

         ·

        2021-01-18 18:09


        來源:yishizuofei

        blog.csdn.net/yishizuofei/article/details/78353722

        鎖的機制


        鎖和人很像,有的人樂觀,總會想到好的一方面,所以只要越努力,就會越幸運;有的人悲觀,總會想到不好的一方面,患得患失,所以經(jīng)常會做不好事。

        我一直把前一個當作為我前進的動力和方向,快樂充實的過好每一天。

        常用的鎖機制也有兩種:
        1、樂觀鎖:假設不會發(fā)生并發(fā)沖突,每次不加鎖而去完成某項操作,只在提交操作時,檢查是否違反數(shù)據(jù)完整性。如果因為沖突失敗就繼續(xù)重試,直到成功為止。而樂觀鎖用到的機制就是CAS。

        樂觀鎖大多是基于數(shù)據(jù)版本記錄機制實現(xiàn)。?為數(shù)據(jù)增加一個版本標識,比如在基于數(shù)據(jù)庫表的版本解決方案中,一般是通過微數(shù)據(jù)庫表增加一個“version”字段來實現(xiàn)。讀取數(shù)據(jù)時,將此版本號一同讀出,之后更新時,對此版本號加一。

        此時,?將提交數(shù)據(jù)的版本數(shù)據(jù)與數(shù)據(jù)庫表對應記錄的當前版本信息進行比對,如果提交的數(shù)據(jù)版本號大于數(shù)據(jù)庫表當前版本號,則予以更新,否則認為是過期數(shù)據(jù)。樂觀鎖的缺點是不能解決臟讀的問題

        注意:?在實際生產(chǎn)環(huán)境里邊,如果并發(fā)量不大且不允許臟讀,可以使用悲觀鎖解決并發(fā)問題。但如果系統(tǒng)的并發(fā)非常大的話,悲觀鎖定會帶來非常大的性能問題,所以我們就要選擇樂觀鎖。

        2、悲觀鎖:假定會發(fā)生并發(fā)沖突,屏蔽一切可能違反數(shù)據(jù)完整性的操作。悲觀鎖的實現(xiàn),往往依靠底層提供的鎖機制。

        悲觀鎖會導致其他所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖。如果所有線程都在等待其他線程釋放鎖,而不能主動釋放鎖資源,那么也會造成死鎖問題。

        鎖的機制存在以下問題:
        (1)在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調(diào)度延時,引起性能問題。
        (2)一個線程持有鎖會導致其他所有需要次所的線程掛起。
        (3)如果一個優(yōu)先級搞得線程等待一個優(yōu)先級低的線程釋放鎖會導致優(yōu)先級倒置,引起性能風險。

        CAS操作

        Compare And Set(或Compare And Swap),CAS是解決多線程并行情況下使用鎖造成性能損耗的一種機制,CAS操作包含三個操作數(shù)——內(nèi)存位置(V)、預期原值(A)、新值(B)

        如果內(nèi)存位置的值與預期原值相同,那么處理器會自動將內(nèi)存的值更新為新值。否則,處理器不做任何操作。無論哪種情況,處理器都會在CAS指令之前返回該位置的值。

        CAS有效地說明了 “我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更新該位置,只告訴我這個位置現(xiàn)在的值即可?!?/span>

        現(xiàn)在幾乎所有的CPU指令都支持CAS的原子操作,X86下對應的是CMPXCHG匯編指令。有了這個操作,我們就可以用其來實現(xiàn)各種無鎖的數(shù)據(jù)結構。

        這個操作可以用以下的例子來描述:
        意思是,看一看內(nèi)存*reg里的值是不是oldval,如果是的話,則對其賦值newval,并返回true,表示更新成功,如果返回false,則表示修改失敗。

        bool?compare_and_swap(int?*reg,int?oldval,int?newval)
        {
        ????int?reg_val?=?*reg;
        ????if(reg_val?==?oldval)
        ????{
        ????????*reg?=?newval;
        ????????return?true;
        ????}
        ????return?false;
        }

        CAS操作無鎖隊列的實現(xiàn)(參考)

        Q:CAS的實現(xiàn)
        A:gcc提供了兩個函數(shù)

        bool?__sync_bool_compare_and_swap?(type?*ptr,?type?oldval,?
        ????????????????????????????????????type?newval, ...);
        type?__sync_val_compare_and_swap?(type?*ptr,?type?oldval,?
        ????????????????????????????type?newval, ...);

        這兩個函數(shù)提供原子的比較和交換,如果*ptr == oldval,就將newval寫入*ptr,
        第一個函數(shù)在相等并寫入的情況下返回true,這個函數(shù)比第二個好在,返回bool值可以知道有沒有更新成功。
        第二個函數(shù)在返回操作之前的值。

        第二個函數(shù)用c語言描述:

        type?__sync_val_compare_and_swap?(type?*ptr,?type?oldval,?
        ????????????????????????????type?newval,?...)
        {
        ????type?cur?=?*ptr;
        ????if?(cur?==?oldval)
        ????{
        ????????*ptr?=?newval;
        ????}
        ????return?cur;//?返回操作之前的值
        }

        type只能是1,2,4或8字節(jié)長度的int類型,否則會發(fā)生下面的錯誤
        https://img-blog.csdn.net/20180812085312545?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lpc2hpenVvZmVp/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70

        Q: 操作系統(tǒng)級別是如何實現(xiàn)的
        A: X86中有一個CMPXCHG的匯編指令

        Q: CAS指令有什么缺點
        A: 1.存在ABA問題因為CAS需要在操作值的時候檢查下值有沒有發(fā)生變化,如果沒有發(fā)生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進行檢查時會發(fā)現(xiàn)它的值沒有發(fā)生變化,但是實際上卻變化了。

        ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那么A-B-A 就會變成1A-2B-3A。


        2.循環(huán)時間長開銷大自旋CAS如果長時間不成功,會給CPU帶來非常大的執(zhí)行開銷。

        3.只能保證一個共享變量的原子操作對一個共享變量執(zhí)行操作時,我們可以使用循環(huán)CAS的方式來保證原子操作,但是對多個共享變量操作時,循環(huán)CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合并成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合并一下ij=2a,然后用CAS來操作ij。

        gcc從4.1.2提供了__sync_*系列的built-in函數(shù),用于提供加減和邏輯運算的原子操作。

        其聲明如下:

        原子操作的后置加加 type __sync_fetch_and_add (type *ptr, type value, …)
        原子操作的前置加加 type __sync_add_and_fetch (type *ptr, type value, …)
        其他類比

        type?__sync_fetch_and_sub(type?*ptr,?type?value,?…)??
        type?__sync_fetch_and_or(type?*ptr,?type?value,?…)??
        type?__sync_fetch_and_and(type?*ptr,?type?value,?…)??
        type?__sync_fetch_and_xor(type?*ptr,?type?value,?…)??
        type?__sync_fetch_and_nand(type?*ptr,?type?value,?…)

        type?__sync_sub_and_fetch(type?\*ptr,?type?value,?…)??
        type?__sync_or_and_fetch(type?*ptr,?type?value,?…)??
        type?__sync_and_and_fetch(type?*ptr,?type?value,?…)??
        type?__sync_xor_and_fetch(type?*ptr,?type?value,?…)??
        type?__sync_nand_and_fetch(type?*ptr,?type?value,?…)?

        這兩組函數(shù)的區(qū)別在于第一組返回更新前的值,第二組返回更新后的值。

        關于CAS函數(shù),參考:?

        http://blog.csdn.net/youfuchen/article/details/23179799

        在多線程環(huán)境下有以下情景:

        比如對同一鏈隊列進行入隊操作時一個線程正在將新的隊列節(jié)點 掛載到 隊尾節(jié)點的next上,可是還沒來的及更新隊尾節(jié)點 但同一時刻另一個線程也在進行入隊操作將新的隊列節(jié)點也掛在到了沒更新的隊尾節(jié)點那么先掛載的節(jié)點就丟失了。

        為了解決多線程環(huán)境下的這些問題,我們第一時間肯定想到了加上互斥鎖控制同一時刻只能有一個線程可以對隊列進行寫操作,但是加鎖的操作太消耗系統(tǒng)資源了很繁重 。

        因為對臨界區(qū)的操作只有一步 就是對隊列的尾節(jié)點進行更新,只要讓這一步進行的是原子操作就可以了,所以使用到了CAS操作。

        為了有一個對比 寫了一份thread_queue.c是用鎖對臨界區(qū)進行控制訪問的
        另一份是lock_free_queue.c是用CAS確保對臨界區(qū)的操作是原子操作

        “queue.h”

        #ifndef?QUEUE_H_
        #define?QUEUE_H_

        #include?
        #include?

        /*
        普通的
        鏈式隊列
        */
        typedef?struct?QNode
        {
        ????int?data;
        ????struct?QNode?*next;
        }QNode,?*QueuePtr;

        typedef?struct?LinkQueue
        {
        ????QueuePtr?front;
        ????QueuePtr?rear;
        }LinkQueue;

        void?init_Queue(LinkQueue?*q);//初始化隊列
        void?push_Queue(LinkQueue?*q,?int?e);//隊尾入隊
        int?pop_Queue(LinkQueue?*q,?int?*e);//隊頭出隊
        int?is_Empty(LinkQueue?*q);
        void?show(LinkQueue?*q);

        #endif?/*?QUEUE_H_?*/

        “queue.c”

        #include?"queue.h"

        /*
        初始化
        為隊列構建一個頭結點
        讓front和rear都指向這個頭結點
        */
        void?init_Queue(LinkQueue?*q)
        {
        ????q->front?=?q->rear?=?(QNode?*)malloc(sizeof(QNode));
        ????q->front->next?=?NULL;
        }

        /*
        普通的入隊操作
        */
        void?push_Queue(LinkQueue?*q,?int?e)
        {
        ????QueuePtr?newNode?=?(QueuePtr)malloc(sizeof(QNode));
        ????newNode->data?=?e;
        ????newNode->next?=?NULL;
        ????q->rear->next?=?newNode;
        ????q->rear?=?newNode;
        }

        /*
        cas的入隊操作
        和普通的入隊操作一樣
        新建節(jié)點后
        要將新節(jié)點掛在隊尾時需要進行cas操作
        因為官方文檔:The definition given in?the?Intel?documentation?allows?only?for?the?use?of?the?types?int,?long,?long?long?as?well?as?their?unsigned?counterparts
        只能用?int,?long,?long?long
        所以要把指針類型?QueuePtr?變成?long
        用long的另一個原因就是:屏蔽32位和64位的差異 long在32位是4字節(jié) 64位是8字節(jié)
        */
        void?cas_push(LinkQueue?*q,?int?e)
        {
        ????QueuePtr?newNode?=?(QueuePtr)malloc(sizeof(QNode));
        ????newNode->data?=?e;
        ????newNode->next?=?NULL;

        ????QueuePtr?tmp;
        ????do
        ????{
        ????????tmp?=?q->rear;
        ????}while?(!__sync_bool_compare_and_swap((long?*)(&(tmp->next)),?NULL,?(long)newNode));

        ????q->rear?=?newNode;
        }

        /*
        以前的判空是?q->front?==?q->rear
        但是這樣子會增加出隊的操作?當出的是最后一個元素時,?q->rear需要指向?q->front
        我把這一步省了?暫時沒有發(fā)現(xiàn)有什么副作用
        所以我改成了?q->front->next?==?NULL
        */
        int?is_Empty(LinkQueue?*q)
        {
        ????if?(q->front->next?==?NULL)
        ????{
        ????????return(1);
        ????}
        ????return(0);
        }

        /*
        普通的出隊操作
        如果隊空?返回0?也就是false
        e作為接受元素的緩沖
        */
        int?pop_Queue(LinkQueue?*q,?int?*e)
        {
        ????if?(is_Empty(q))
        ????{
        ????????return(0);
        ????}
        ????QueuePtr?tmp;
        ????tmp?=?q->front->next;
        ????q->front->next?=?tmp->next;

        ????*e?=?tmp->data;
        ????free(tmp);
        ????return(1);
        }

        /*
        cas的出隊操作
        每一次都要判斷這個隊列是不是空
        然后執(zhí)行cas的出隊操作:
        (1)tmp = q->rear 把舊的隊頭存起來
        (2)執(zhí)行原子操作:看?舊的隊頭?是否等于?現(xiàn)在的隊頭 tmp ==?*(&(q->front))?如果相等執(zhí)行?*(&(q->front))?= tmp->next 返回true?
        ????否則,即執(zhí)行這一步原子操作的時候,別的線程修改了隊列,導致隊尾指向改變了,返回false?,while(!false)回到第一步重新執(zhí)行
        */
        int?cas_pop(LinkQueue?*q,?int?*e)
        {
        ????QueuePtr?tmp;
        ????do?{
        ????????if?(is_Empty(q))
        ????????{
        ????????????return(0);
        ????????}
        ????????//printf("cas_pop...\n");
        ????????tmp?=?q->front->next;
        ????}?while?(!__sync_bool_compare_and_swap((long?*)(&(q->front->next)),?(long)tmp,?(long)tmp->next));

        ????*e?=?tmp->data;
        ????free(tmp);
        ????return(1);
        }

        /*
        遍歷隊列?打印里面的元素?為了求證隊列里面的元素
        */
        void?show(LinkQueue?*q)
        {
        ????printf("void?show(LinkQueue?*q)\n");
        ????QueuePtr?tmp?=?q->front->next;
        ????while?(tmp)
        ????{
        ????????printf("%d?",?tmp->data);
        ????????tmp?=?tmp->next;
        ????}
        ????printf("\n");
        }

        “thread_queue.c”

        #include?"queue.h"
        #include?
        #include?
        #include?
        #include?

        #define?THREAD_NUMBER?4//開啟的線程數(shù),電腦是4核,所以用4

        //sem_t?queue_sem;//信號量
        pthread_mutex_t?mutex;//互斥鎖

        void?*thread_push(void?*arg);
        void?*thread_pop(void?*arg);

        int?main()
        {
        ????LinkQueue?que;
        ????init_Queue(&que);

        ????/*初始化二進制信號量?初始值為1?代表每一次只有1個線程可以訪問?
        ????本來更加應該用互斥量?比較貼合情景?但是不太熟?就用了信號量
        ????*/
        ????//int?res?=?sem_init(&queue_sem,?0,?1);
        ????//assert(res?!=?-1);

        ????int?i;
        ????pthread_t?threadArr[THREAD_NUMBER];
        ????for?(i?=?0;?i?????{
        ????????pthread_create(&threadArr[i],?NULL,?thread_push,?(void?*)&que);
        ????}

        ????for?(i?=?0;?i?????{
        ????????pthread_join(threadArr[i],?NULL);
        ????}

        ????show(&que);

        ????for?(i?=?0;?i?????{
        ????????pthread_create(&threadArr[i],?NULL,?thread_pop,?(void?*)&que);
        ????}

        ????for?(i?=?0;?i?????{
        ????????pthread_join(threadArr[i],?NULL);
        ????}

        ????//sem_destroy(&queue_sem);

        ????exit(EXIT_SUCCESS);
        }

        void?*thread_push(void?*arg)
        {
        ????printf("start?push\n");
        ????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
        ????int?i;
        ????for?(i?=?0;?i?????{
        ????????//sem_wait(&queue_sem);
        ????????pthread_mutex_lock(&mutex);
        ????????push_Queue(quePtr,?i);
        ????????pthread_mutex_unlock(&mutex);
        ????????//sem_post(&queue_sem);
        ????}
        ????printf("finish?push\n");
        ????pthread_exit(NULL);
        }

        void?*thread_pop(void?*arg)
        {
        ????printf("start?pop\n");
        ????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
        ????int?tmp;
        ????int?res;
        ????while?(1)
        ????{
        ????????//sem_wait(&queue_sem);
        ????????pthread_mutex_lock(&mutex);
        ????????res?=?pop_Queue(quePtr,?&tmp);
        ????????pthread_mutex_unlock(&mutex);
        ????????//sem_post(&queue_sem);
        ????????if?(!res)
        ????????{
        ????????????break;
        ????????}
        ????????printf("%d?",?tmp);
        ????}
        ????printf("finish?pop\n");
        ????pthread_exit(NULL);
        }

        “l(fā)ock_free_queue.c”

        #include?"queue.h"
        #include?
        #include?
        #include?

        #define?THREAD_NUMBER?4//開啟的線程數(shù),電腦是4核,所以用4

        void?*thread_push(void?*arg);
        void?*thread_pop(void?*arg);

        /*
        初始化空隊列

        為了模擬線程對資源的搶占
        開啟4個線程?每個線程push?20個元素?0~19
        等待4個線程結束
        打印隊列元素?驗證push
        開啟四個線程?每個線程都對隊列進行?pop操作
        */
        int?main()
        {
        ????LinkQueue?que;
        ????init_Queue(&que);

        ????int?i;
        ????/*
        ????創(chuàng)造四個新線程?每個線程都執(zhí)行?thread_push(&que)
        ????*/
        ????pthread_t?threadArr[THREAD_NUMBER];
        ????for?(i?=?0;?i?????{
        ????????pthread_create(&threadArr[i],?NULL,?thread_push,?(void?*)&que);
        ????}

        ????/*
        ????等待四個線程都執(zhí)行完
        ????要不然主線程一下子就跑完了?程序就結束了
        ????還有就是?為了show函數(shù)?可以驗證元素是不是都push進去了
        ????*/
        ????for?(i?=?0;?i?????{
        ????????pthread_join(threadArr[i],?NULL);
        ????}

        ????show(&que);

        ????/*
        ????創(chuàng)造四個新線程?每個線程都執(zhí)行?thread_pop(&que)
        ????*/
        ????for?(i?=?0;?i?????{
        ????????pthread_create(&threadArr[i],?NULL,?thread_pop,?(void?*)&que);
        ????}

        ????for?(i?=?0;?i?????{
        ????????pthread_join(threadArr[i],?NULL);
        ????}

        ????exit(EXIT_SUCCESS);
        }

        void?*thread_push(void?*arg)
        {
        ????printf("start?push\n");
        ????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
        ????int?i;
        ????for?(i?=?0;?i?????{
        ????????cas_push(quePtr,?i);
        ????}
        ????printf("finish?push\n");
        ????pthread_exit(NULL);
        }

        void?*thread_pop(void?*arg)
        {
        ????printf("start?pop\n");
        ????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
        ????int?tmp;
        ????int?res;
        ????while?(1)
        ????{
        ????????res?=?cas_pop(quePtr,?&tmp);
        ????????if?(!res)
        ????????{
        ????????????break;
        ????????}
        ????????printf("%d?",?tmp);
        ????????//sleep(1);
        ????}
        ????printf("finish?pop\n");
        ????pthread_exit(NULL);
        }
        瀏覽 28
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 女生摸男生的鸡鸡 | 一级黄色录像大片 | 亚洲www | 处女被强行糟蹋bd | 亚洲AV成人精品一区二区三区四区 |