Log-Structured Merge Tree というデータ構造があって、データ指向アプリケーションデザインを読んでいるとかなり最初の方に出てくる。Wikipediaの記事の通りLevelDBを始めきょうびのさまざまなデータベース製品で使われている。
特徴はめちゃくちゃざっくり*1
- 追記型のログを使うことで書き込みの性能を保つ
- インデックスはキー名とログ内のオフセットzを持っておき、読み出しも速度を出す
- ログを SSTable というデータ構造に退避することで、インデックスはある程度疎にしつつ(めちゃくちゃキーが多くなってインデックスの時点でデカくなるなどを防ぐ)速度を保ってアクセスできる
- 必要に応じてマージなどをしてインデックスは最新のデータだけに、小さくする
今回お仕事とか色々でLSMを使うため、コードの理解を助けるためにまずは自分で実装しようと思ったので、久しぶりの連作ブログ記事を始めた。
自分で実装と言いつつ細かいところをオミットしていたり、勘違いもあると思うので、優しくご指摘いただければ...。
そんな感じです。
やること
LSM-Tree indexを持つログ追記型のファイルベースDBを作る。
細かいチューニングは置いておく。
言語はGoです。
参考になるもの
まず、僕が後半を読めていないことは置いておいて、データ指向アプリケーションデザインは素晴らしい本なので書いましょう、要約で満足しない方がいい。
そして実装内容は正直、以下の実装記事がわかりやすい上に、実装の間の細かい実装まで補完してくれていて助かった。今のところほぼこの解説の通りに実装する感じになっているのであった。
追記型のログを実装する
最初はこれ。理想的なインタフェースとしては
以下のように逐一追記ができ、
foo.NewEncoder(r).Encode(log1) foo.NewEncoder(r).Encode(log2) foo.NewEncoder(r).Encode(log3)
以下のようにちょっとずつシークして読み出せてほしい。
for notFound { log := foo.NewDecoder(r).Decode() if log.Key == key { ...; notFound = false } }
最初、既存の gob や encoding/json を使おうとしたんだけれど、微妙にDecode()の挙動が合致しない(gob 最初に型の情報をヘッダに持つのでシークしづらい、encoding/jsonはバッファにまとめて読み込んでからパースするので、データの区切りとシーク位置が合わないことがある)ため結局自作した*2。
バイナリフォーマットは以下にした... が後述するように変えるかもしれない。
<- High Low -> | klen: int32 | key: []byte | \0 | vlen: int32 | value: []byte | \0 | deleted: byte | \0\0 |
数値はlittle endianで突っ込むのと、最後に \0\0
で番兵を置いている。なんとなく。
実装はこういう感じ...
type Encoder struct { w io.Writer } func NewEncoder(w io.Writer) *Encoder { return &Encoder{w} } func (e *Encoder) Encode(log *log.Log) error { klen := int32(len(log.Key) + 1) vlen := int32(len(log.Value) + 1) deleted := int8(0) if log.Deleted { deleted = 1 } buf := make([]byte, 0) w := bytes.NewBuffer(buf) _ = binary.Write(w, binary.LittleEndian, klen) _, _ = w.Write([]byte(log.Key)) _, _ = w.Write(nullByte) _ = binary.Write(w, binary.LittleEndian, vlen) _, _ = w.Write([]byte(log.Value)) _, _ = w.Write(nullByte) _ = binary.Write(w, binary.LittleEndian, deleted) _, _ = w.Write(nullByte) _, _ = w.Write(nullByte) if _, err := e.w.Write(w.Bytes()); err != nil { return err } return nil }
デコードの際は軽くフォーマットも見ておく。
type Decoder struct { r io.Reader } func NewDecoder(r io.Reader) *Decoder { return &Decoder{r} } func (d *Decoder) Decode() (*log.Log, error) { var klen, vlen int32 var deleted int8 = -1 err := binary.Read(d.r, binary.LittleEndian, &klen) if err != nil { return nil, err // 以下errのガードは省略 } key := make([]byte, klen) _, _ = d.r.Read(key) if key[len(key)-1] != '\000' { return nil, InvalidLogFormat } _ = binary.Read(d.r, binary.LittleEndian, &vlen) value := make([]byte, vlen) _, _ = d.r.Read(value) if value[len(value)-1] != '\000' { return nil, InvalidLogFormat } _ = binary.Read(d.r, binary.LittleEndian, &deleted) if deleted != 0 && deleted != 1 { return nil, InvalidLogFormat } sentinel := make([]byte, 2) _, _ = d.r.Read(sentinel) if sentinel[0] != '\000' || sentinel[1] != '\000' { return nil, InvalidLogFormat } valueDeleted := false if deleted == 1 { valueDeleted = true } log := &log.Log{ Key: string(key[0 : len(key)-1]), Value: string(value[0 : len(value)-1]), Deleted: valueDeleted, } return log, nil }
Decode の実装をしてみて、最初に長さがわからないとあまりに細かくSeekするハメになると分かってきたので、リファクタのネタとしてヘッダー部を用意するかもしれない。長さがわからないので SectionReader と相性が悪い。
MemTable, SSTableを含むデータ構造
コンパクションを考える前の実装。
import rbt "github.com/emirpasic/gods/trees/redblacktree" type LSM struct { mu sync.RWMutex memTable *MemTable memtableWorking *MemTable sstables []*SSTable logdir string // TODO: compaction // inCompaction bool } type MemTable struct { tree *rbt.Tree } type SSTable struct { prefix string index *rbt.Tree file *os.File }
赤黒木は github.com/emirpasic/gods を使うことにした。
Get/Put/Delete 操作を実装する。まずPutは簡単。memTableに値が入ればいいだけなので。
func (lsm *LSM) Put(key, value string) { lsm.mu.Lock() defer lsm.mu.Unlock() log := &slog.Log{ Key: key, Value: value, } lsm.memTable.tree.Put(key, log) }
Log は deleted フラグを明示的に持たせる
type Log struct { Key string Value string Deleted bool }
これでDeleteはほぼPutと同じようになる
func (lsm *LSM) Delete(key string) { lsm.mu.Lock() defer lsm.mu.Unlock() log := &slog.Log{ Key: key, Value: "", Deleted: true, } lsm.memTable.tree.Put(key, log) }
ここで、memTable から SSTableにマイグレする操作を実装。
// エラー処理は省略 func (lsm *LSM) MigrateToSSTable() error { lsm.mu.Lock() // 作業用memTableにコピーし、memTableは一旦初期化 lsm.memtableWorking = lsm.memTable lsm.memTable = &MemTable{ tree: rbt.NewWithStringComparator(), } lsm.mu.Unlock() // 大体動くだろうと、タイムスタンプNano秒でファイルを作る、ソートすれば時間順になる prefix := fmt.Sprintf("%024d", time.Now().UnixNano()) sst := &SSTable{ prefix: prefix, index: rbt.NewWithStringComparator(), } logf, _ := os.OpenFile( filepath.Join(lsm.logdir, prefix+".log"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o0600, ) defer logf.Close() logWriter := codec.NewEncoder(logf) indexf, _ := os.OpenFile( filepath.Join(lsm.logdir, prefix+".index"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o0600, ) defer indexf.Close() // 作業用memTableのKeys() は勝手に辞書順に並んでいるので // ただeachして順番にEncodeすればOK、その際オフセット情報を記録 for i, key := range lsm.memtableWorking.tree.Keys() { key := key.(string) data, ok := lsm.memtableWorking.tree.Get(key) if !ok { continue } log := data.(*slog.Log) offset, err := logf.Seek(0, 1) if err != nil { return err } if err := logWriter.Encode(log); err != nil { return err } if i%indexInterval == 0 { sst.index.Put(key, offset) } } // オフセットもファイルに書き出す必要がある indexData, _ := sst.index.ToJSON() if _, _ := indexf.Write(indexData); err != nil { _ = logf.Close() // reopen f, _ := os.Open(filepath.Join(lsm.logdir, prefix+".log")) sst.file = f lsm.mu.Lock() // sstables に追加し、memtableWorkingは空にする lsm.sstables = append([]*SSTable{sst}, lsm.sstables...) lsm.memtableWorking = nil lsm.mu.Unlock() return nil }
これでGetのロジックを実装できる。
- まずmemTableから探す
- もし作業中memTableがあれば次にそれを探す
- 最後に、時間が新しい順にSSTableを探す
func (lsm *LSM) Get(key string) (string, bool) { lsm.mu.RLock() defer lsm.mu.RUnlock() var log *slog.Log = nil data, ok := lsm.memTable.tree.Get(key) if !ok { if lsm.memtableWorking != nil { data, ok := lsm.memtableWorking.tree.Get(key) if ok { log = data.(*slog.Log) } } if log == nil { for _, sst := range lsm.sstables { data, ok := sst.Get(key) if ok { log = data break } } } } else { log = data.(*slog.Log) } if log == nil { return "", false } if log.Deleted { return "", false } return log.Value, true }
SSTableのGetは
func (sst *SSTable) Get(key string) (*slog.Log, bool) { // 対象keyが含まれている範囲を探す、始点が prevkey に入るはず prevkey := "" for _, currentkey := range sst.index.Keys() { if key < currentkey.(string) { break } prevkey = currentkey.(string) } if prevkey == "" { return nil, false } off, _ := sst.index.Get(prevkey) _, _ := sst.file.Seek(off, 0) for true { // off からひたすら探す。辞書順に並んでいるはずなので log, err := codec.NewDecoder(sst.file).Decode() if err != nil { if errors.Is(err, io.EOF) { return nil, false } panic(err) } if log.Key == key { return log, true } } return nil, false // unreachableなはず }
実装
使い方
次回
コンパクションを実装する。その後、ベンチを取ったりKVSに組み込んだりしたいですね。