Skip to content

Commit f5f57a2

Browse files
authored
Fix DecodeDatabaseTo() with lock page (#51)
1 parent 9a8893c commit f5f57a2

File tree

3 files changed

+74
-7
lines changed

3 files changed

+74
-7
lines changed

decoder.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ func (dec *Decoder) Close() error {
7272
return fmt.Errorf("unmarshal trailer: %w", err)
7373
}
7474

75+
// TODO: Ensure last read page is equal to the commit for snapshot LTX files
76+
7577
dec.writeToHash(b[:TrailerChecksumOffset])
7678

7779
// Compare checksum with checksum in trailer.
@@ -203,24 +205,44 @@ func (dec *Decoder) Verify() error {
203205
func (dec *Decoder) DecodeDatabaseTo(w io.Writer) error {
204206
if err := dec.DecodeHeader(); err != nil {
205207
return fmt.Errorf("decode header: %w", err)
206-
} else if !dec.header.IsSnapshot() {
208+
}
209+
210+
hdr := dec.Header()
211+
lockPgno := hdr.LockPgno()
212+
if !dec.header.IsSnapshot() {
207213
return fmt.Errorf("cannot decode non-snapshot LTX file to SQLite database")
208214
}
209215

210216
var pageHeader PageHeader
211217
data := make([]byte, dec.header.PageSize)
212-
for i := 0; ; i++ {
213-
if err := dec.DecodePage(&pageHeader, data); err == io.EOF {
214-
break
215-
} else if err != nil {
216-
return fmt.Errorf("decode page %d: %w", i, err)
218+
for pgno := uint32(1); pgno <= hdr.Commit; pgno++ {
219+
if pgno == lockPgno {
220+
// Write empty page for lock page.
221+
for i := range data {
222+
data[i] = 0
223+
}
224+
} else {
225+
// Otherwise read the page from the LTX decoder.
226+
if err := dec.DecodePage(&pageHeader, data); err != nil {
227+
return fmt.Errorf("decode page %d: %w", pgno, err)
228+
} else if pageHeader.Pgno != pgno {
229+
return fmt.Errorf("unexpected pgno while decoding page: read %d, expected %d", pageHeader.Pgno, pgno)
230+
}
217231
}
218232

219233
if _, err := w.Write(data); err != nil {
220-
return fmt.Errorf("write page %d: %w", i, err)
234+
return fmt.Errorf("write page %d: %w", pgno, err)
221235
}
222236
}
223237

238+
// Issue one more final read and expect to see an EOF. This is required so
239+
// that the decoder can successfully close and validate.
240+
if err := dec.DecodePage(&pageHeader, data); err == nil {
241+
return fmt.Errorf("unexpected page %d after commit %d", pageHeader.Pgno, hdr.Commit)
242+
} else if err != io.EOF {
243+
return fmt.Errorf("unexpected error decoding after end of database: %w", err)
244+
}
245+
224246
if err := dec.Close(); err != nil {
225247
return fmt.Errorf("close decoder: %w", err)
226248
}

decoder_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,46 @@ func TestDecoder_DecodeDatabaseTo(t *testing.T) {
147147
}
148148
})
149149

150+
t.Run("WithLockPage", func(t *testing.T) {
151+
lockPgno := ltx.LockPgno(4096)
152+
commit := lockPgno + 10
153+
154+
var want bytes.Buffer
155+
var buf bytes.Buffer
156+
enc := ltx.NewEncoder(&buf)
157+
if err := enc.EncodeHeader(ltx.Header{Version: 1, Flags: 0, PageSize: 4096, Commit: commit, MinTXID: 1, MaxTXID: 2, Timestamp: 1000}); err != nil {
158+
t.Fatal(err)
159+
}
160+
161+
pageData := bytes.Repeat([]byte("x"), 4096)
162+
for pgno := uint32(1); pgno <= commit; pgno++ {
163+
if pgno == lockPgno {
164+
_, _ = want.Write(make([]byte, 4096))
165+
continue
166+
}
167+
168+
_, _ = want.Write(pageData)
169+
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, pageData); err != nil {
170+
t.Fatal(err)
171+
}
172+
}
173+
174+
enc.SetPostApplyChecksum(0xc19b668c376662c7)
175+
if err := enc.Close(); err != nil {
176+
t.Fatal(err)
177+
}
178+
179+
// Decode serialized LTX file.
180+
dec := ltx.NewDecoder(&buf)
181+
182+
var out bytes.Buffer
183+
if err := dec.DecodeDatabaseTo(&out); err != nil {
184+
t.Fatal(err)
185+
} else if got, want := out.Bytes(), want.Bytes(); !bytes.Equal(got, want) {
186+
t.Fatal("output mismatch")
187+
}
188+
})
189+
150190
t.Run("ErrNonSnapshot", func(t *testing.T) {
151191
spec := &ltx.FileSpec{
152192
Header: ltx.Header{Version: 1, Flags: 0, PageSize: 512, Commit: 2, MinTXID: 2, MaxTXID: 2, Timestamp: 1000, PreApplyChecksum: ltx.ChecksumFlag | 1},

ltx.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ func (h *Header) IsSnapshot() bool {
223223
return h.MinTXID == 1
224224
}
225225

226+
// LockPgno returns the lock page number based on the header's page size.
227+
func (h *Header) LockPgno() uint32 {
228+
return LockPgno(h.PageSize)
229+
}
230+
226231
// Validate returns an error if h is invalid.
227232
func (h *Header) Validate() error {
228233
if h.Version != Version {

0 commit comments

Comments
 (0)