1. JuiceFS 源碼閱讀-中

        共 39431字,需瀏覽 79分鐘

         ·

        2021-08-07 09:09

        JuiceFS鎖的實現(xiàn)

        JuiceFS的鎖實現(xiàn),目前同時實現(xiàn)了BSD locks(對應Flock)和POSIX locks(對應Setlk)。細節(jié)上最大區(qū)別就是BSD locks只能以FD為最小控制單位(簡單理解為單文件加鎖,鎖定的是文件描述符fd對應的文件),而POSIX locks可以在一個文件中以文件的offset+length的方式進行加鎖(按文件內容進行范圍加鎖)。

        BSD locks

        //pkg/vfs/vfs_unix.go

        func Flock(ctx Context, ino Ino, fh uint64, owner uint64, typ uint32, block bool) (err syscall.Errno) {
            var name string
            var reqid uint32
            defer func() { logit(ctx, "flock (%d,%d,%016X,%s,%t): %s", reqid, ino, owner, name, block, strerr(err)) }()
            switch typ {
            case syscall.F_RDLCK:
                name = "LOCKSH"
            case syscall.F_WRLCK:
                name = "LOCKEX"
            case syscall.F_UNLCK:
                name = "UNLOCK"
            default:
                err = syscall.EINVAL
                return
            }

            if IsSpecialNode(ino) {
                err = syscall.EPERM
                return
            }
            h := findHandle(ino, fh)
            if h == nil {
                err = syscall.EBADF
                return
            }
            h.addOp(ctx)
            defer h.removeOp(ctx)
            err = m.Flock(ctx, ino, owner, typ, block) //核心在元數(shù)據(jù)部分的控制,具體參考下面部分代碼注釋
            if err == 0 {
                h.Lock()
                if typ == syscall.F_UNLCK {
                    h.locks &= 2
                } else {
                    h.locks |= 1
                    h.flockOwner = owner
                }
                h.Unlock()
            }
            return
        }
        //pkg/meta/sql_unix.go
        func (m *dbMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, block bool) syscall.Errno {
            if ltype == syscall.F_UNLCK {//如果為解鎖操作,則只需要刪除對應的db記錄即可
                return errno(m.txn(func(s *xorm.Session) error {
                    _, err := s.Delete(&flock{Inode: inode, Owner: owner, Sid: m.sid})
                    return err
                }))
            }
            var err syscall.Errno
            //循環(huán)處理加鎖請求,分為阻塞(block=true)和非阻塞兩種類型操作
            for {
                err = errno(m.txn(func(s *xorm.Session) error {
                //獲取inode信息,避免鎖指向的對象不存在,成為空鎖。
                    if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                        if err == nil && !exists {
                            err = syscall.ENOENT
                        }
                        return err
                    }
                    //查詢inode關聯(lián)的全部鎖信息
                    rows, err := s.Rows(&flock{Inode: inode})
                    if err != nil {
                        return err
                    }
                    type key struct {
                        sid uint64
                        o   uint64
                    }
                    var locks = make(map[key]flock)
                    var l flock
                    for rows.Next() {
                        if rows.Scan(&l) == nil {
                        //執(zhí)行迭代,將查詢結果臨時保存到locks數(shù)據(jù)結構中
                            locks[key{l.Sid, l.Owner}] = l
                        }
                    }
                    rows.Close()
        //判斷需要加鎖的類型是否為讀鎖,如果已經有寫鎖則加鎖失敗
                    if ltype == syscall.F_RDLCK {
                        for _, l := range locks {
                            if l.Ltype == 'W' {
                                return syscall.EAGAIN
                            }
                        }
                        //沒有寫鎖沖突,則通過insert記錄加上讀鎖
                        return mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'R', Sid: m.sid})
                    }

                    //加寫入鎖邏輯:先判斷是否已經有寫入鎖(判斷l(xiāng)ocks中是否有重復鍵值),如果有則更新鎖的記錄,否則直接insert插入對應的記錄
                    me := key{m.sid, owner}
                    _, ok := locks[me]
                    delete(locks, me)
                    if len(locks) > 0 {
                        return syscall.EAGAIN
                    }
                    if ok {
                        _, err = s.Cols("Ltype").Update(&flock{Ltype: 'W'}, &flock{Inode: inode, Owner: owner, Sid: m.sid})
                    } else {
                        err = mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'W', Sid: m.sid})
                    }
                    return err
                }))
            //非阻塞or報錯直接返回結果
                if !block || err != syscall.EAGAIN {
                    break
                }
                //阻塞情況下加寫鎖,則等待固定時長再進行下一輪加鎖操作
                if ltype == syscall.F_WRLCK {
                    time.Sleep(time.Millisecond * 1)
                } else {
                    time.Sleep(time.Millisecond * 10)
                }
                if ctx.Canceled() {
                    return syscall.EINTR
                }
            }
            return err
        }

        POSIX locks

        按pid進行范圍加鎖,實現(xiàn)起來相對比較復雜,核心算法在updateLocks中實現(xiàn)。

        func (m *dbMeta) Setlk(ctx Context, inode Ino, owner_ uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno {
            var err syscall.Errno
            lock := plockRecord{ltype, pid, start, end}//以pid為粒度,所以適合單機多進/線程模型,跨節(jié)點不太合適
            owner := int64(owner_)
            for {
                err = errno(m.txn(func(s *xorm.Session) error {
                    if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                        if err == nil && !exists {
                            err = syscall.ENOENT
                        }
                        return err
                    }
                    //unlock操作
                    if ltype == F_UNLCK {
                    //sid代表session ID,每個客戶端的數(shù)據(jù)庫連接都有一個獨立的ID實例
                        var l = plock{Inode: inode, Owner: owner, Sid: m.sid}
                        ok, err := m.engine.Get(&l) //按inode、owner、sid三個字段組合,查詢鎖列表
                        if err != nil {
                            return errno(err)
                        }
                        if !ok {
                            return nil
                        }
                        ls := loadLocks([]byte(l.Records)) //解析鎖列表信息
                        if len(ls) == 0 {
                            return nil
                        }
                        ls = updateLocks(ls, lock) //在已有所列表里面新增鎖記錄,有點復雜,之后詳細介紹
                        if len(ls) == 0 {
                            _, err = s.Delete(&plock{Inode: inode, Owner: owner, Sid: m.sid}) //刪除鎖記錄
                        } else {
                            _, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)}, l) //更新已有所記錄
                        }
                        return err
                    }
                    //以inode為關鍵字,查找已有的鎖列表
                    rows, err := s.Rows(&plock{Inode: inode})
                    if err != nil {
                        return errno(err)
                    }
                    type key struct {
                        sid   uint64
                        owner int64
                    }
                    var locks = make(map[key][]byte)
                    var l plock
                    //按查詢結果構建鎖map
                    for rows.Next() {
                        if rows.Scan(&l) == nil {
                            locks[key{l.Sid, l.Owner}] = dup(l.Records)
                        }
                    }
                    rows.Close()
                    //遍歷map,判斷是否有沖突鎖
                    lkey := key{m.sid, owner}
                    for k, d := range locks {
                        if k == lkey {
                            continue
                        }
                        ls := loadLocks([]byte(d))
                        for _, l := range ls {
                            // find conflicted locks
                            if (ltype == F_WRLCK || l.ltype == F_WRLCK) && end > l.start && start < l.end {
                                return syscall.EAGAIN
                            }
                        }
                    }
                    ls := updateLocks(loadLocks([]byte(locks[lkey])), lock) //更新鎖列表信息
                    var n int64
                    //保存鎖列表記錄到DB
                    if len(locks[lkey]) > 0 {
                        n, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)},
                            &plock{Inode: inode, Sid: m.sid, Owner: owner})
                    } else {
                        n, err = s.InsertOne(&plock{Inode: inode, Sid: m.sid, Owner: owner, Records: dumpLocks(ls)})
                    }
                    if err == nil && n == 0 {
                        err = fmt.Errorf("insert/update failed")
                    }
                    return err
                }))
            //如果加鎖失敗且不進行阻塞,則直接返回結果
                if !block || err != syscall.EAGAIN {
                    break
                }
                //加鎖失敗,阻塞,進入下一輪操作
                if ltype == F_WRLCK {
                    time.Sleep(time.Millisecond * 1)
                } else {
                    time.Sleep(time.Millisecond * 10)
                }
                if ctx.Canceled() {
                    return syscall.EINTR
                }
            }
            return err
        }

        updateLocks 的代碼邏輯如下,通過加上debug輸出,更加容易觀察其中細節(jié)

        const (
            F_UNLCK = syscall.F_UNLCK
            F_RDLCK = syscall.F_RDLCK
            F_WRLCK = syscall.F_WRLCK
        )

        type plockRecord struct {
            ltype uint32
            pid   uint32
            start uint64
            end   uint64
        }


        func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
            //fmt.Println(i,"insertLocks before ls=",ls)
            nls := make([]plockRecord, len(ls)+1)
            copy(nls[:i], ls[:i])
            nls[i] = nl
            copy(nls[i+1:], ls[i:])
            ls = nls
            //fmt.Println(i,"insertLocks after ls=",ls)
            return ls
        }

        func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
            // ls is ordered by l.start without overlap
            var i int
            for i < len(ls) && nl.end > nl.start {
                l := ls[i]
                if l.end < nl.start {
                    fmt.Println("新增鎖設定的區(qū)域超過當前鎖范圍,查找下一個")
                } else if l.start < nl.start {
                    //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                    fmt.Println("1. 當前鎖包含部分新鎖區(qū)域,拆分成兩個鎖,調整當前鎖范圍從[",ls[i].start,"->",ls[i].end,"]調整為[",ls[i].start,"->",nl.start,"],并在當前位置之后插入新鎖 [",nl.start,"->",l.end,"]")
                    //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
                    ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
                    ls[i].end = nl.start
                    i++
                    nl.start = l.end
                } else if l.end < nl.end {
                    //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
                    fmt.Println("2. 當前鎖區(qū)間屬于新鎖區(qū)間,縮小當前鎖范圍,從[",ls[i].start,"->",ls[i].end,"]調整為[",nl.start,"->",ls[i].end,"]")
                    ls[i].ltype = nl.ltype
                    ls[i].start = nl.start
                    nl.start = l.end
                } else if l.start < nl.end {
                    //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
                    ls = insertLocks(ls, i, nl)
                    fmt.Println("3. 新鎖與當前鎖有部分內容重疊,需要在當前位置插入新鎖=[",nl.start,nl.end,"],并調整下一個鎖的起始位置從[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
                    ls[i+1].start = nl.end
                    nl.start = nl.end
                } else {
                    fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                    fmt.Println("4. 新鎖右側和當前鎖沒有重疊(l.start>nl.end),僅需要在當前位置插入新鎖=[",nl.start,nl.end,"]")
                    ls = insertLocks(ls, i, nl)
                    nl.start = nl.end
                }
                i++
            }

            if nl.start < nl.end {
                ls = append(ls, nl)
                fmt.Println("5. 仍然有部分尾部內容沒有,補充末尾部分的鎖內容,補充后=",ls)
            }

            i = 0
            //再次遍歷鎖列表,進行無效內容刪除or區(qū)間合并操作。
            for i < len(ls) {
                if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
                    // remove empty one
                    //fmt.Println("刪除鎖列表從i=",i,"位置的內容,刪除前l(fā)s=",ls)
                    //fmt.Println("刪除鎖列表從i=",i,"位置的內容",ls[i:i+1])
                    copy(ls[i:], ls[i+1:]) //從i位置開始左移1個單位
                    ls = ls[:len(ls)-1] //刪除末尾
                    fmt.Println("6-1. 刪除鎖列表從i=",i,"位置的內容,刪除后ls=",ls)
                } else {
                    if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                        fmt.Println("6-2. 鎖類型相同,且首尾相接,進行區(qū)間合并操作",ls)
                        // combine continuous range
                        ls[i].end = ls[i+1].end
                        ls[i+1].start = ls[i+1].end
                        //fmt.Println("鎖類型相同,且首尾相接,進行區(qū)間合并操作2",ls)
                    }
                    i++
                }
            }
            return ls
        }

        小結

        整個POSIX locks的算法主要是通過遍歷已有的鎖列表ls(數(shù)組結構),并按照一定規(guī)則進行新增鎖記錄的插入(簡單理解為滑動窗口查找),其中nl代表窗口滑動范圍鎖,l代表當前已經有的鎖。

        -w900

        根據(jù)代碼注釋,大概分為4種類型的鎖處理。其中l(wèi)odlock代表已有的鎖記錄(對應l),Newlock是新增鎖的記錄(對應nl)

        類型1:

        -w881

        類型2:

        -w899

        類型3:

        -w899

        類型4:

        -w899

        發(fā)現(xiàn)問題

        發(fā)現(xiàn)一個pid同步的bug,當新鎖的內容覆蓋舊鎖時,并未更新對應的pid記錄,導致加鎖雖然成功,但是鎖的pid還是指向舊的pid內容。復現(xiàn)代碼如下

        package main

        import (
            "fmt"
            "syscall"
        )

        const (
            F_UNLCK = syscall.F_UNLCK
            F_RDLCK = syscall.F_RDLCK
            F_WRLCK = syscall.F_WRLCK
        )

        type plockRecord struct {
            ltype uint32
            pid   uint32
            start uint64
            end   uint64
        }


        func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
            //fmt.Println(i,"insertLocks before ls=",ls)
            nls := make([]plockRecord, len(ls)+1)
            copy(nls[:i], ls[:i])
            nls[i] = nl
            copy(nls[i+1:], ls[i:])
            ls = nls
            //fmt.Println(i,"insertLocks after ls=",ls)
            return ls
        }

        func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
            // ls is ordered by l.start without overlap
            var i int
            for i < len(ls) && nl.end > nl.start {
                l := ls[i]
                if l.end < nl.start {
                    fmt.Println("新增鎖設定的區(qū)域超過當前鎖范圍,查找下一個")
                } else if l.start < nl.start {
                    //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                    fmt.Println("1. 當前鎖包含部分新鎖區(qū)域,拆分成兩個鎖,調整當前鎖范圍從[",ls[i].start,"->",ls[i].end,"]調整為[",ls[i].start,"->",nl.start,"],并在當前位置之后插入新鎖 [",nl.start,"->",l.end,"]")
                    //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
                    ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
                    ls[i].end = nl.start
                    i++
                    nl.start = l.end
                } else if l.end < nl.end {
                    //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
                    fmt.Println("2. 當前鎖區(qū)間屬于新鎖區(qū)間,縮小當前鎖范圍,從[",ls[i].start,"->",ls[i].end,"]調整為[",nl.start,"->",ls[i].end,"]")
                    ls[i].ltype = nl.ltype
                    ls[i].start = nl.start
                    //if ls[i].pid != nl.pid { //patch
                    //  ls[i].pid = nl.pid
                    //}
                    nl.start = l.end
                } else if l.start < nl.end {
                    //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
                    ls = insertLocks(ls, i, nl)
                    fmt.Println("3. 新鎖與當前鎖有部分內容重疊,需要在當前位置插入新鎖=[",nl.start,nl.end,"],并調整下一個鎖的起始位置從[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
                    ls[i+1].start = nl.end
                    nl.start = nl.end
                } else {
                    fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                    fmt.Println("4. 新鎖右側和當前鎖沒有重疊(l.start>nl.end),僅需要在當前位置插入新鎖=[",nl.start,nl.end,"]")
                    ls = insertLocks(ls, i, nl)
                    nl.start = nl.end
                }
                i++
            }

            if nl.start < nl.end {
                ls = append(ls, nl)
                fmt.Println("5. 仍然有部分尾部內容沒有,補充末尾部分的鎖內容,補充后=",ls)
            }

            i = 0
            //再次遍歷鎖列表,進行無效內容刪除or區(qū)間合并操作。
            for i < len(ls) {
                if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
                    // remove empty one
                    //fmt.Println("刪除鎖列表從i=",i,"位置的內容,刪除前l(fā)s=",ls)
                    //fmt.Println("刪除鎖列表從i=",i,"位置的內容",ls[i:i+1])
                    copy(ls[i:], ls[i+1:]) //從i位置開始左移1個單位
                    ls = ls[:len(ls)-1] //刪除末尾
                    fmt.Println("6-1. 刪除鎖列表從i=",i,"位置的內容,刪除后ls=",ls)
                } else {
                    if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                        fmt.Println("6-2. 鎖類型相同,且首尾相接,進行區(qū)間合并操作",ls)
                        // combine continuous range
                        ls[i].end = ls[i+1].end
                        ls[i+1].start = ls[i+1].end
                        //fmt.Println("鎖類型相同,且首尾相接,進行區(qū)間合并操作2",ls)
                    }
                    i++
                }
            }
            return ls
        }


        func Setlk( ltype uint32, start, end uint64, pid uint32) {

            lock := plockRecord{ltype, pid, start, end}
            //ls := []plockRecord{plockRecord{F_WRLCK, pid, 04},{F_WRLCK, pid, 710},{F_WRLCK, pid, 1316}}
            ls := []plockRecord{plockRecord{F_WRLCK, 10004},{F_WRLCK, 102710}}
            //ls := []plockRecord{plockRecord{F_WRLCK, pid, 14}}
            fmt.Println("before updateLocks=",ls)
            ls = updateLocks(ls, lock)
            fmt.Println("after updateLocks=",ls)
        }

        func main(){
            Setlk(F_WRLCK,6,13,103) //理論上加鎖以后的記錄應該對應pid=103,上面的patch已經修復這個問題
        }

        輸出內容如下:

        before updateLocks= [{3 100 0 4} {3 102 7 10}]
        新增鎖設定的區(qū)域超過當前鎖范圍,查找下一個
        2. 當前鎖區(qū)間屬于新鎖區(qū)間,縮小當前鎖范圍,從[ 7 -> 10 ]調整為[ 6 -> 10 ]
        5. 仍然有部分尾部內容沒有,補充末尾部分的鎖內容,補充后= [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
        6-2. 鎖類型相同,且首尾相接,進行區(qū)間合并操作 [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
        6-1. 刪除鎖列表從i= 2 位置的內容,刪除后ls= [{3 100 0 4} {3 102 6 13}]
        after updateLocks= [{3 100 0 4} {3 102 6 13}] //理論上這里的pid=102是對應的是舊鎖內容,應該被新增加的鎖記錄pid=103覆蓋


        瀏覽 75
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 办公室秘书被狂啪嗯啊视频 | 免费a在线观看 | 亚洲AV性爱| 国产18一19sex性护士 | 欧老太做爱亚洲性猛交 |