ローファイ日記

出てくるコード片、ぼくが書いたものは断りがない場合 MIT License としています http://udzura.mit-license.org/

自作 LSM-Tree その1

Log-Structured Merge Tree というデータ構造があって、データ指向アプリケーションデザインを読んでいるとかなり最初の方に出てくる。Wikipediaの記事の通りLevelDBを始めきょうびのさまざまなデータベース製品で使われている。

特徴はめちゃくちゃざっくり*1

  • 追記型のログを使うことで書き込みの性能を保つ
  • インデックスはキー名とログ内のオフセットzを持っておき、読み出しも速度を出す
  • ログを SSTable というデータ構造に退避することで、インデックスはある程度疎にしつつ(めちゃくちゃキーが多くなってインデックスの時点でデカくなるなどを防ぐ)速度を保ってアクセスできる
  • 必要に応じてマージなどをしてインデックスは最新のデータだけに、小さくする

今回お仕事とか色々でLSMを使うため、コードの理解を助けるためにまずは自分で実装しようと思ったので、久しぶりの連作ブログ記事を始めた。

自分で実装と言いつつ細かいところをオミットしていたり、勘違いもあると思うので、優しくご指摘いただければ...。

そんな感じです。

やること

LSM-Tree indexを持つログ追記型のファイルベースDBを作る。

細かいチューニングは置いておく。

言語はGoです。

参考になるもの

まず、僕が後半を読めていないことは置いておいて、データ指向アプリケーションデザインは素晴らしい本なので書いましょう、要約で満足しない方がいい。

www.oreilly.co.jp

そして実装内容は正直、以下の実装記事がわかりやすい上に、実装の間の細かい実装まで補完してくれていて助かった。今のところほぼこの解説の通りに実装する感じになっているのであった。

dasshshsd.hatenablog.com

追記型のログを実装する

最初はこれ。理想的なインタフェースとしては

以下のように逐一追記ができ、

foo.NewEncoder(r).Encode(log1)
foo.NewEncoder(r).Encode(log2)
foo.NewEncoder(r).Encode(log3)

以下のようにちょっとずつシークして読み出せてほしい。

for notFound {
    log := foo.NewDecoder(r).Decode()
    if log.Key == key { ...; notFound = false }
}

最初、既存の gob や encoding/json を使おうとしたんだけれど、微妙にDecode()の挙動が合致しない(gob 最初に型の情報をヘッダに持つのでシークしづらい、encoding/jsonはバッファにまとめて読み込んでからパースするので、データの区切りとシーク位置が合わないことがある)ため結局自作した*2

バイナリフォーマットは以下にした... が後述するように変えるかもしれない。

<- High                                                                    Low ->
| klen: int32 | key: []byte | \0 | vlen: int32 | value: []byte | \0 | deleted: byte | \0\0 |

数値はlittle endianで突っ込むのと、最後に \0\0 で番兵を置いている。なんとなく。

実装はこういう感じ...

type Encoder struct {
    w io.Writer
}

func NewEncoder(w io.Writer) *Encoder {
    return &Encoder{w}
}

func (e *Encoder) Encode(log *log.Log) error {
    klen := int32(len(log.Key) + 1)
    vlen := int32(len(log.Value) + 1)
    deleted := int8(0)
    if log.Deleted {
        deleted = 1
    }
    buf := make([]byte, 0)
    w := bytes.NewBuffer(buf)

    _ = binary.Write(w, binary.LittleEndian, klen)
    _, _ = w.Write([]byte(log.Key))
    _, _ = w.Write(nullByte)
    _ = binary.Write(w, binary.LittleEndian, vlen)
    _, _ = w.Write([]byte(log.Value))
    _, _ = w.Write(nullByte)
    _ = binary.Write(w, binary.LittleEndian, deleted)
    _, _ = w.Write(nullByte)
    _, _ = w.Write(nullByte)

    if _, err := e.w.Write(w.Bytes()); err != nil {
        return err
    }

    return nil
}

デコードの際は軽くフォーマットも見ておく。

type Decoder struct {
    r io.Reader
}

func NewDecoder(r io.Reader) *Decoder {
    return &Decoder{r}
}

func (d *Decoder) Decode() (*log.Log, error) {
    var klen, vlen int32
    var deleted int8 = -1
    err := binary.Read(d.r, binary.LittleEndian, &klen)
    if err != nil {
        return nil, err // 以下errのガードは省略
    }
    key := make([]byte, klen)
    _, _ = d.r.Read(key)
    if key[len(key)-1] != '\000' {
        return nil, InvalidLogFormat
    }

    _ = binary.Read(d.r, binary.LittleEndian, &vlen)
    value := make([]byte, vlen)
    _, _ = d.r.Read(value)

    if value[len(value)-1] != '\000' {
        return nil, InvalidLogFormat
    }

    _ = binary.Read(d.r, binary.LittleEndian, &deleted)
    if deleted != 0 && deleted != 1 {
        return nil, InvalidLogFormat
    }

    sentinel := make([]byte, 2)
    _, _ = d.r.Read(sentinel)
    if sentinel[0] != '\000' || sentinel[1] != '\000' {
        return nil, InvalidLogFormat
    }

    valueDeleted := false
    if deleted == 1 {
        valueDeleted = true
    }
    log := &log.Log{
        Key:     string(key[0 : len(key)-1]),
        Value:   string(value[0 : len(value)-1]),
        Deleted: valueDeleted,
    }

    return log, nil
}

Decode の実装をしてみて、最初に長さがわからないとあまりに細かくSeekするハメになると分かってきたので、リファクタのネタとしてヘッダー部を用意するかもしれない。長さがわからないので SectionReader と相性が悪い。

MemTable, SSTableを含むデータ構造

コンパクションを考える前の実装。

import rbt "github.com/emirpasic/gods/trees/redblacktree"

type LSM struct {
    mu sync.RWMutex

    memTable        *MemTable
    memtableWorking *MemTable
    sstables        []*SSTable

    logdir string
    // TODO: compaction
    // inCompaction bool
}

type MemTable struct {
    tree *rbt.Tree
}

type SSTable struct {
    prefix string
    index  *rbt.Tree
    file   *os.File
}

赤黒木は github.com/emirpasic/gods を使うことにした。

Get/Put/Delete 操作を実装する。まずPutは簡単。memTableに値が入ればいいだけなので。

func (lsm *LSM) Put(key, value string) {
    lsm.mu.Lock()
    defer lsm.mu.Unlock()
    log := &slog.Log{
        Key:   key,
        Value: value,
    }
    lsm.memTable.tree.Put(key, log)
}

Log は deleted フラグを明示的に持たせる

type Log struct {
    Key     string
    Value   string
    Deleted bool
}

これでDeleteはほぼPutと同じようになる

func (lsm *LSM) Delete(key string) {
    lsm.mu.Lock()
    defer lsm.mu.Unlock()
    log := &slog.Log{
        Key:     key,
        Value:   "",
        Deleted: true,
    }
    lsm.memTable.tree.Put(key, log)
}

ここで、memTable から SSTableにマイグレする操作を実装。

// エラー処理は省略
func (lsm *LSM) MigrateToSSTable() error {
    lsm.mu.Lock()
    // 作業用memTableにコピーし、memTableは一旦初期化
    lsm.memtableWorking = lsm.memTable
    lsm.memTable = &MemTable{
        tree: rbt.NewWithStringComparator(),
    }
    lsm.mu.Unlock()

    // 大体動くだろうと、タイムスタンプNano秒でファイルを作る、ソートすれば時間順になる
    prefix := fmt.Sprintf("%024d", time.Now().UnixNano())
    sst := &SSTable{
        prefix: prefix,
        index:  rbt.NewWithStringComparator(),
    }
    logf, _ := os.OpenFile(
        filepath.Join(lsm.logdir, prefix+".log"),
        os.O_CREATE|os.O_TRUNC|os.O_WRONLY,
        0o0600,
    )
    defer logf.Close()
    logWriter := codec.NewEncoder(logf)
    indexf, _ := os.OpenFile(
        filepath.Join(lsm.logdir, prefix+".index"),
        os.O_CREATE|os.O_TRUNC|os.O_WRONLY,
        0o0600,
    )
    defer indexf.Close()

    // 作業用memTableのKeys() は勝手に辞書順に並んでいるので
    // ただeachして順番にEncodeすればOK、その際オフセット情報を記録
    for i, key := range lsm.memtableWorking.tree.Keys() {
        key := key.(string)
        data, ok := lsm.memtableWorking.tree.Get(key)
        if !ok {
            continue
        }
        log := data.(*slog.Log)
        offset, err := logf.Seek(0, 1)
        if err != nil {
            return err
        }
        if err := logWriter.Encode(log); err != nil {
            return err
        }

        if i%indexInterval == 0 {
            sst.index.Put(key, offset)
        }
    }

    // オフセットもファイルに書き出す必要がある
    indexData, _ := sst.index.ToJSON()
    if _, _ := indexf.Write(indexData); err != nil {

    _ = logf.Close()

    // reopen
    f, _ := os.Open(filepath.Join(lsm.logdir, prefix+".log"))
    sst.file = f

    lsm.mu.Lock()
    // sstables に追加し、memtableWorkingは空にする
    lsm.sstables = append([]*SSTable{sst}, lsm.sstables...)
    lsm.memtableWorking = nil
    lsm.mu.Unlock()

    return nil
}

これでGetのロジックを実装できる。

  • まずmemTableから探す
  • もし作業中memTableがあれば次にそれを探す
  • 最後に、時間が新しい順にSSTableを探す
func (lsm *LSM) Get(key string) (string, bool) {
    lsm.mu.RLock()
    defer lsm.mu.RUnlock()
    var log *slog.Log = nil
    data, ok := lsm.memTable.tree.Get(key)
    if !ok {
        if lsm.memtableWorking != nil {
            data, ok := lsm.memtableWorking.tree.Get(key)
            if ok {
                log = data.(*slog.Log)
            }
        }

        if log == nil {
            for _, sst := range lsm.sstables {
                data, ok := sst.Get(key)
                if ok {
                    log = data
                    break
                }
            }
        }
    } else {
        log = data.(*slog.Log)
    }
    if log == nil {
        return "", false
    }

    if log.Deleted {
        return "", false
    }

    return log.Value, true
}

SSTableのGetは

func (sst *SSTable) Get(key string) (*slog.Log, bool) {
    // 対象keyが含まれている範囲を探す、始点が prevkey に入るはず
    prevkey := ""
    for _, currentkey := range sst.index.Keys() {
        if key < currentkey.(string) {
            break
        }
        prevkey = currentkey.(string)
    }

    if prevkey == "" {
        return nil, false
    }

    off, _ := sst.index.Get(prevkey)
    _, _ := sst.file.Seek(off, 0)

    for true {
    // off からひたすら探す。辞書順に並んでいるはずなので
        log, err := codec.NewDecoder(sst.file).Decode()
        if err != nil {
            if errors.Is(err, io.EOF) {
                return nil, false
            }
            panic(err)
        }
        if log.Key == key {
            return log, true
        }
    }

    return nil, false // unreachableなはず
}

実装

github.com

使い方

github.com

次回

コンパクションを実装する。その後、ベンチを取ったりKVSに組み込んだりしたいですね。

*1:この時点で誤解があったら指摘してください...

*2:バイナリフォーマットを考えるのって、上がりますね