Rebuild indexes

This commit is contained in:
Nicolas Dextraze 2018-03-08 14:59:07 -08:00
parent 6713362868
commit 84e8e469ad
4 changed files with 650 additions and 539 deletions

View File

@ -14,6 +14,7 @@ import (
var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to") var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to")
var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage") var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage")
var buildTypeIndexes = flag.Bool("buildTypeIndexes", false, "Build type indexes") var buildTypeIndexes = flag.Bool("buildTypeIndexes", false, "Build type indexes")
var buildIndexes = flag.Bool("buildIndexes", false, "Build indexes")
func PathIsAbsolute(s string) bool { func PathIsAbsolute(s string) bool {
if len(s) > 1 && s[1] == ':' { if len(s) > 1 && s[1] == ':' {
@ -36,6 +37,10 @@ func main() {
} }
diskStorage := storage.NewDailyDiskStorage(storagePath) diskStorage := storage.NewDailyDiskStorage(storagePath)
if *buildIndexes {
diskStorage.RebuildIndexes()
return
}
if *buildTypeIndexes { if *buildTypeIndexes {
diskStorage.RebuildTypeIndexes() diskStorage.RebuildTypeIndexes()
return return

View File

@ -1,280 +1,381 @@
package storage package storage
import ( import (
"path" "path"
"os" "os"
"time" "time"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"fmt" "fmt"
"errors" "errors"
"io/ioutil" "io/ioutil"
"bytes" "bytes"
) "path/filepath"
"encoding/json"
const EMPTY_STREAM = uint32(0) "strings"
var CRLF = []byte("\r\n") "strconv"
)
type DailyDiskStorage struct {
storagePath string const EMPTY_STREAM = uint32(0)
indexesPath string var CRLF = []byte("\r\n")
typesIndexesPath string
globalIndexFilename string type DailyDiskStorage struct {
} storagePath string
indexesPath string
func NewDailyDiskStorage(storagePath string) Storage { typesIndexesPath string
fmt.Println("Using DailyDiskStorage path:", storagePath) globalIndexFilename string
indexesPath := path.Join(storagePath, "indexes") }
globalIndexPath := path.Join(indexesPath, "global")
typesIndexesPath := path.Join(indexesPath, "types") func NewDailyDiskStorage(storagePath string) Storage {
if err := os.MkdirAll(typesIndexesPath, 0777); err != nil { fmt.Println("Using DailyDiskStorage path:", storagePath)
panic(err) indexesPath := path.Join(storagePath, "indexes")
} globalIndexPath := path.Join(indexesPath, "global")
return &DailyDiskStorage{storagePath, indexesPath, typesIndexesPath, globalIndexPath}; typesIndexesPath := path.Join(indexesPath, "types")
} if err := os.MkdirAll(typesIndexesPath, 0777); err != nil {
panic(err)
func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { }
return path.Join(me.indexesPath, streamId.String()) return &DailyDiskStorage{storagePath, indexesPath, typesIndexesPath, globalIndexPath};
} }
func (me DailyDiskStorage) getEventFilename(creationTime time.Time, typeId string) string { func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string {
yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month()) return path.Join(me.indexesPath, streamId.String())
day := fmt.Sprintf("%02d", creationTime.Day()) }
eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId)
return path.Join(me.storagePath, yearMonth, day, eventFilename) func (me DailyDiskStorage) getEventFilename(creationTime time.Time, typeId string) string {
} yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month())
day := fmt.Sprintf("%02d", creationTime.Day())
type IndexEntry struct { eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId)
streamId uuid.UUID return path.Join(me.storagePath, yearMonth, day, eventFilename)
creationTime time.Time }
typeId string
} type IndexEntry struct {
streamId uuid.UUID
func appendIndex(filename string, entry *IndexEntry) error { creationTime time.Time
indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) typeId string
if err != nil { }
return err
} func appendIndex(filename string, entry *IndexEntry) error {
defer indexFile.Close() indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
if err != nil {
written, err := indexFile.Write(entry.streamId.Bytes()) return err
if err != nil { }
return err defer indexFile.Close()
}
if written != 16 { written, err := indexFile.Write(entry.streamId.Bytes())
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", 16, written)) if err != nil {
} return err
}
creationTimeBytes, err := entry.creationTime.MarshalBinary() if written != 16 {
if err != nil { return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", 16, written))
return err }
}
writeSizeAndBytes(indexFile, creationTimeBytes) creationTimeBytes, err := entry.creationTime.MarshalBinary()
writeSizeAndBytes(indexFile, []byte(entry.typeId)) if err != nil {
return err
return nil }
} writeSizeAndBytes(indexFile, creationTimeBytes)
writeSizeAndBytes(indexFile, []byte(entry.typeId))
func (me DailyDiskStorage) appendTypeIndex(entry *IndexEntry) error {
filename := path.Join(me.typesIndexesPath, entry.typeId) return nil
indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644 ) }
if err != nil {
return err func (me DailyDiskStorage) appendTypeIndex(entry *IndexEntry) error {
} filename := path.Join(me.typesIndexesPath, entry.typeId)
defer indexFile.Close() indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644 )
if err != nil {
value := me.getEventFilename(entry.creationTime, entry.typeId) return err
start := len(me.storagePath) + 1 }
_, err = indexFile.WriteString(value[start:] + "\r\n") defer indexFile.Close()
return err value := me.getEventFilename(entry.creationTime, entry.typeId)
} start := len(me.storagePath) + 1
_, err = indexFile.WriteString(value[start:] + "\r\n")
func readIndexNextEntry(f *os.File) (*IndexEntry, error) {
index := IndexEntry{} return err
}
uuidBytes := make([]byte, 16)
read, err := f.Read(uuidBytes) func readIndexNextEntry(f *os.File) (*IndexEntry, error) {
if err != nil { index := IndexEntry{}
return nil, err
} uuidBytes := make([]byte, 16)
if read != 16 { read, err := f.Read(uuidBytes)
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %v bytes, got only %v bytes.", 16, read)) if err != nil {
} return nil, err
index.streamId = uuid.FromBytesOrNil(uuidBytes) }
if read != 16 {
creationTimeBytes, err := readSizedBytes(f) return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %v bytes, got only %v bytes.", 16, read))
if err != nil { }
return nil, err index.streamId = uuid.FromBytesOrNil(uuidBytes)
}
if err = index.creationTime.UnmarshalBinary(creationTimeBytes); err != nil { creationTimeBytes, err := readSizedBytes(f)
return nil, err if err != nil {
} return nil, err
}
typeIdBytes, err := readSizedBytes(f) if err = index.creationTime.UnmarshalBinary(creationTimeBytes); err != nil {
index.typeId = string(typeIdBytes) return nil, err
}
return &index, nil;
} typeIdBytes, err := readSizedBytes(f)
index.typeId = string(typeIdBytes)
func writeEvent(filename string, data []byte, metadata []byte) error {
eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) return &index, nil;
if err != nil { }
return err
} func writeEvent(filename string, data []byte, metadata []byte) error {
defer eventFile.Close() eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
if err != nil {
eventFile.Write(data) return err
eventFile.Write(CRLF) }
eventFile.Write(metadata) defer eventFile.Close()
return nil eventFile.Write(data)
} eventFile.Write(CRLF)
eventFile.Write(metadata)
func readEvent(filename string) (data []byte, metadata []byte, err error) {
content, err := ioutil.ReadFile(filename) return nil
if err != nil { }
return
} func readEvent(filename string) (data []byte, metadata []byte, err error) {
sep := bytes.Index(content, CRLF) content, err := ioutil.ReadFile(filename)
if sep == -1 { if err != nil {
data = content return
metadata = make([]byte, 0) }
return sep := bytes.Index(content, CRLF)
} if sep == -1 {
data = content[:sep] data = content
metadata = content[sep+2:] metadata = make([]byte, 0)
return return
} }
data = content[:sep]
func (me DailyDiskStorage) Write(event *StoredEvent) error { metadata = content[sep+2:]
return
eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) }
os.MkdirAll(path.Dir(eventFilename), 0777)
func (me DailyDiskStorage) Write(event *StoredEvent) error {
err := writeEvent(eventFilename, event.Data, event.Metadata)
if err != nil { eventFilename := me.getEventFilename(event.CreationTime, event.TypeId)
return err os.MkdirAll(path.Dir(eventFilename), 0777)
}
err := writeEvent(eventFilename, event.Data, event.Metadata)
index := &IndexEntry{event.StreamId, event.CreationTime, event.TypeId} if err != nil {
return err
err = appendIndex(me.globalIndexFilename, index) }
if err != nil {
return err index := &IndexEntry{event.StreamId, event.CreationTime, event.TypeId}
}
err = appendIndex(me.globalIndexFilename, index)
err = appendIndex(me.getStreamIndexFilename(event.StreamId), index) if err != nil {
if err != nil { return err
return err }
}
err = appendIndex(me.getStreamIndexFilename(event.StreamId), index)
return me.appendTypeIndex(index) if err != nil {
} return err
}
func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) {
indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) return me.appendTypeIndex(index)
if err != nil { }
return EMPTY_STREAM, errors.New("NOT_FOUND: " + err.Error())
} func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) {
defer indexFile.Close() indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0)
if err != nil {
ver := EMPTY_STREAM return EMPTY_STREAM, errors.New("NOT_FOUND: " + err.Error())
for { }
_, err := readIndexNextEntry(indexFile) defer indexFile.Close()
if err != nil && err.Error() == "EOF" {
break ver := EMPTY_STREAM
} for {
if err != nil { _, err := readIndexNextEntry(indexFile)
return EMPTY_STREAM, err if err != nil && err.Error() == "EOF" {
} break
ver++ }
} if err != nil {
return EMPTY_STREAM, err
return ver, nil }
} ver++
}
func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
return ver, nil
indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) }
if err != nil {
return nil, errors.New("NOT_FOUND: " + err.Error()) func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
}
defer indexFile.Close() indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0)
if err != nil {
events := make([]*StoredEvent, 0) return nil, errors.New("NOT_FOUND: " + err.Error())
for { }
indexEntry, err := readIndexNextEntry(indexFile) defer indexFile.Close()
if err != nil && err.Error() == "EOF" {
break events := make([]*StoredEvent, 0)
} for {
if err != nil { indexEntry, err := readIndexNextEntry(indexFile)
return nil, err if err != nil && err.Error() == "EOF" {
} break
data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) }
if err != nil { if err != nil {
return nil, err return nil, err
} }
event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId))
events = append(events, event) if err != nil {
} return nil, err
}
return events, nil event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata}
} events = append(events, event)
}
func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0) return events, nil
if err != nil { }
return nil, err
} func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) {
defer indexFile.Close() indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0)
if err != nil {
events := make([]*StoredEvent, 0) return nil, err
for { }
indexEntry, err := readIndexNextEntry(indexFile) defer indexFile.Close()
if err != nil && err.Error() == "EOF" {
break events := make([]*StoredEvent, 0)
} for {
if err != nil { indexEntry, err := readIndexNextEntry(indexFile)
return nil, err if err != nil && err.Error() == "EOF" {
} break
data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) }
if err != nil { if err != nil {
return nil, err return nil, err
} }
event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId))
events = append(events, event) if err != nil {
} return nil, err
}
return events, nil event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata}
} events = append(events, event)
}
func (me DailyDiskStorage) RebuildTypeIndexes() {
fmt.Print("Rebuilding type indexes... ") return events, nil
}
err := os.RemoveAll(me.typesIndexesPath)
if err != nil { func (me DailyDiskStorage) RebuildTypeIndexes() {
panic(err) fmt.Print("Rebuilding type indexes... ")
}
err = os.MkdirAll(me.typesIndexesPath, 0777) err := os.RemoveAll(me.typesIndexesPath)
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = os.MkdirAll(me.typesIndexesPath, 0777)
globalIndexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0644) if err != nil {
if err != nil { panic(err)
panic(err) }
}
globalIndexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0644)
for { if err != nil {
indexEntry, err := readIndexNextEntry(globalIndexFile) panic(err)
if err != nil && err.Error() == "EOF" { }
break
} for {
if err != nil { indexEntry, err := readIndexNextEntry(globalIndexFile)
panic(err) if err != nil && err.Error() == "EOF" {
} break
me.appendTypeIndex(indexEntry) }
} if err != nil {
panic(err)
fmt.Println("Done.") }
me.appendTypeIndex(indexEntry)
}
fmt.Println("Done.")
}
func (me DailyDiskStorage) RebuildIndexes() {
fmt.Print("Rebuilding indexes...");
err := os.RemoveAll(me.indexesPath)
if err != nil {
panic(err)
}
err = os.MkdirAll(me.typesIndexesPath, 0777)
if err != nil {
panic(err)
}
fileList := make([]string, 0)
err = filepath.Walk(me.storagePath, func(path string, f os.FileInfo, err error) error {
if !f.IsDir() {
fileList = append(fileList, path)
}
return nil
})
for _, file := range fileList {
event := map[string]interface{} {}
if content, err := ioutil.ReadFile(file); err != nil {
panic(err)
} else if parts := bytes.Split(content, CRLF); len(parts) == 0 {
panic(errors.New(fmt.Sprintf("Empty event file %s", file)))
} else if err := json.Unmarshal(parts[0], &event); err != nil {
panic(err)
} else {
firstKey := string(bytes.Split(parts[0], []byte("\""))[1])
streamId := event[firstKey].(string)
streamUuid, err := uuid.FromString(streamId)
if err != nil {
panic(err)
}
if file[0:len(me.storagePath)] == me.storagePath {
file = file[len(me.storagePath)+1:]
}
file = strings.Replace(file, "/", "", -1)
parts := strings.Split(file, "_")
typeId := parts[1]
year, err := strconv.Atoi(file[0:4])
if err != nil {
panic(err)
}
month, err := strconv.Atoi(file[4:6])
if err != nil {
panic(err)
}
day, err := strconv.Atoi(file[6:8])
if err != nil {
panic(err)
}
hour, err := strconv.Atoi(file[8:10])
if err != nil {
panic(err)
}
min, err := strconv.Atoi(file[10:12])
if err != nil {
panic(err)
}
sec, err := strconv.Atoi(file[12:14])
if err != nil {
panic(err)
}
nsec, err := strconv.Atoi(file[14:23])
if err != nil {
panic(err)
}
loc, err := time.LoadLocation("Local")
if err != nil {
panic(err)
}
creationTime := time.Date(year, time.Month(month), day, hour, min, sec, nsec, loc)
fmt.Printf("%s=%s %d %d %d %d %d %d %d %s\n", firstKey, streamId, year, month, day, hour, min, sec, nsec, typeId)
index := &IndexEntry{streamUuid, creationTime, typeId}
err = appendIndex(me.globalIndexFilename, index)
if err != nil {
panic(err)
}
err = appendIndex(me.getStreamIndexFilename(streamUuid), index)
if err != nil {
panic(err)
}
err = me.appendTypeIndex(index)
if err != nil {
panic(err)
}
}
}
} }

View File

@ -1,234 +1,238 @@
package storage package storage
import ( import (
"encoding/binary" "encoding/binary"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"os" "os"
"time" "time"
"path" "path"
"fmt" "fmt"
"errors" "errors"
) )
func NewSimpleDiskStorage(storagePath string) Storage { func NewSimpleDiskStorage(storagePath string) Storage {
return &SimpleDiskStorage{storagePath, path.Join(storagePath, "eventindex")} return &SimpleDiskStorage{storagePath, path.Join(storagePath, "eventindex")}
} }
type SimpleDiskStorage struct { type SimpleDiskStorage struct {
storagePath string storagePath string
indexPath string indexPath string
} }
func (me SimpleDiskStorage) getFilename(stream, extension string) string { func (me SimpleDiskStorage) getFilename(stream, extension string) string {
return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension) return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension)
} }
func (me SimpleDiskStorage) GetFilenameForEvents(stream string) string { func (me SimpleDiskStorage) GetFilenameForEvents(stream string) string {
return me.getFilename(stream, ".history") return me.getFilename(stream, ".history")
} }
func writeSizeAndBytes(f *os.File, data []byte) (error) { func writeSizeAndBytes(f *os.File, data []byte) (error) {
sizeBytes := make([]byte, IntegerSizeInBytes) sizeBytes := make([]byte, IntegerSizeInBytes)
size := len(data) size := len(data)
binary.BigEndian.PutUint64(sizeBytes, uint64(size)) binary.BigEndian.PutUint64(sizeBytes, uint64(size))
written, err := f.Write(sizeBytes) written, err := f.Write(sizeBytes)
if err != nil { if err != nil {
return err return err
} }
if written != IntegerSizeInBytes { if written != IntegerSizeInBytes {
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", IntegerSizeInBytes, written)) return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", IntegerSizeInBytes, written))
} }
written, err = f.Write(data) written, err = f.Write(data)
if err != nil { if err != nil {
return err return err
} }
if written != size { if written != size {
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", size, written)) return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", size, written))
} }
return nil return nil
} }
func readSizedBytes(f *os.File) ([]byte, error) { func readSizedBytes(f *os.File) ([]byte, error) {
sizeBytes := make([]byte, IntegerSizeInBytes) sizeBytes := make([]byte, IntegerSizeInBytes)
read, err := f.Read(sizeBytes) read, err := f.Read(sizeBytes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if read != IntegerSizeInBytes { if read != IntegerSizeInBytes {
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %d bytes, got %d bytes.", IntegerSizeInBytes, read)) return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %d bytes, got %d bytes.", IntegerSizeInBytes, read))
} }
size := binary.BigEndian.Uint64(sizeBytes) size := binary.BigEndian.Uint64(sizeBytes)
data := make([]byte, size) data := make([]byte, size)
read, err = f.Read(data) read, err = f.Read(data)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if uint64(read) != size { if uint64(read) != size {
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to ready %d bytes, got %d bytes.", IntegerSizeInBytes, read)) return nil, errors.New(fmt.Sprintf("Integrity error. Expected to ready %d bytes, got %d bytes.", IntegerSizeInBytes, read))
} }
return data, nil return data, nil
} }
func (me SimpleDiskStorage) Write(event *StoredEvent) error { func (me SimpleDiskStorage) Write(event *StoredEvent) error {
filename := me.GetFilenameForEvents(event.StreamId.String()) filename := me.GetFilenameForEvents(event.StreamId.String())
os.MkdirAll(path.Dir(filename), os.ModeDir) os.MkdirAll(path.Dir(filename), os.ModeDir)
indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
if err != nil { if err != nil {
return err return err
} }
defer indexFile.Close() defer indexFile.Close()
eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
if err != nil { if err != nil {
return err return err
} }
defer eventsFile.Close() defer eventsFile.Close()
stat, err := eventsFile.Stat() stat, err := eventsFile.Stat()
if err != nil { if err != nil {
return err return err
} }
position := stat.Size() position := stat.Size()
creationTimeBytes, err := event.CreationTime.MarshalBinary() creationTimeBytes, err := event.CreationTime.MarshalBinary()
if err != nil { if err != nil {
return err return err
} }
writeSizeAndBytes(eventsFile, creationTimeBytes) writeSizeAndBytes(eventsFile, creationTimeBytes)
writeSizeAndBytes(eventsFile, []byte(event.TypeId)) writeSizeAndBytes(eventsFile, []byte(event.TypeId))
writeSizeAndBytes(eventsFile, event.Data) writeSizeAndBytes(eventsFile, event.Data)
indexFile.Write(event.StreamId.Bytes()) indexFile.Write(event.StreamId.Bytes())
positionBytes := make([]byte, IntegerSizeInBytes) positionBytes := make([]byte, IntegerSizeInBytes)
binary.BigEndian.PutUint64(positionBytes, uint64(position)) binary.BigEndian.PutUint64(positionBytes, uint64(position))
indexFile.Write(positionBytes) indexFile.Write(positionBytes)
return nil return nil
} }
func (me SimpleDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { func (me SimpleDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) {
//TODO //TODO
return EMPTY_STREAM, nil return EMPTY_STREAM, nil
} }
func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
streamName := streamId.String() streamName := streamId.String()
offset := int64(0) //TODO snapshots offset := int64(0) //TODO snapshots
filename := me.GetFilenameForEvents(streamName) filename := me.GetFilenameForEvents(streamName)
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer eventsFile.Close() defer eventsFile.Close()
eventsFile.Seek(offset, 0) eventsFile.Seek(offset, 0)
results := make([]*StoredEvent, 0) results := make([]*StoredEvent, 0)
for { for {
creationTime, typeId, data, err := getStoredData(eventsFile) creationTime, typeId, data, err := getStoredData(eventsFile)
if err != nil && err.Error() == "EOF" { if err != nil && err.Error() == "EOF" {
break break
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
//TODO metadata //TODO metadata
event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} event := &StoredEvent{streamId, creationTime, typeId, data, "", nil}
results = append(results, event) results = append(results, event)
} }
return results, nil return results, nil
} }
func (me SimpleDiskStorage) ReadAll() ([]*StoredEvent, error) { func (me SimpleDiskStorage) ReadAll() ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0) indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer indexFile.Close() defer indexFile.Close()
results := make([]*StoredEvent, 0) results := make([]*StoredEvent, 0)
guidBytes := make([]byte, 16) guidBytes := make([]byte, 16)
offsetBytes := make([]byte, IntegerSizeInBytes) offsetBytes := make([]byte, IntegerSizeInBytes)
for { for {
read, err := indexFile.Read(guidBytes) read, err := indexFile.Read(guidBytes)
if err != nil { if err != nil {
break break
} }
if read != 16 { if read != 16 {
return nil, errors.New("index integrity error") return nil, errors.New("index integrity error")
} }
read, err = indexFile.Read(offsetBytes) read, err = indexFile.Read(offsetBytes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if read != IntegerSizeInBytes { if read != IntegerSizeInBytes {
return nil, errors.New("index integrity error") return nil, errors.New("index integrity error")
} }
aggregateId, err := uuid.FromBytes(guidBytes) aggregateId, err := uuid.FromBytes(guidBytes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
offset := binary.BigEndian.Uint64(offsetBytes) offset := binary.BigEndian.Uint64(offsetBytes)
storedEvent, err := me.retrieveStoredEvent(aggregateId, int64(offset)) storedEvent, err := me.retrieveStoredEvent(aggregateId, int64(offset))
if err != nil { if err != nil {
return nil, err return nil, err
} }
results = append(results, storedEvent) results = append(results, storedEvent)
} }
return results, nil return results, nil
} }
func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) { func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) {
filename := me.GetFilenameForEvents(streamId.String()) filename := me.GetFilenameForEvents(streamId.String())
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer eventsFile.Close() defer eventsFile.Close()
eventsFile.Seek(offset, 0) eventsFile.Seek(offset, 0)
creationTime, typeId, data, err := getStoredData(eventsFile) creationTime, typeId, data, err := getStoredData(eventsFile)
if err != nil { if err != nil {
return nil, err return nil, err
} }
//TODO metadata //TODO metadata
event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} event := &StoredEvent{streamId, creationTime, typeId, data, "", nil}
return event, nil return event, nil
} }
func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data []byte, err error) { func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data []byte, err error) {
creationTimeBytes, err := readSizedBytes(eventsFile) creationTimeBytes, err := readSizedBytes(eventsFile)
if err != nil { if err != nil {
return return
} }
err = creationTime.UnmarshalBinary(creationTimeBytes) err = creationTime.UnmarshalBinary(creationTimeBytes)
if err != nil { if err != nil {
return return
} }
typeIdBytes, err := readSizedBytes(eventsFile) typeIdBytes, err := readSizedBytes(eventsFile)
if err != nil { if err != nil {
return return
} }
typeId = string(typeIdBytes) typeId = string(typeIdBytes)
data, err = readSizedBytes(eventsFile) data, err = readSizedBytes(eventsFile)
return return
} }
func (me SimpleDiskStorage) RebuildTypeIndexes() { func (me SimpleDiskStorage) RebuildTypeIndexes() {
}
func (me SimpleDiskStorage) RebuildIndexes() {
} }

View File

@ -1,27 +1,28 @@
package storage package storage
import ( import (
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"time" "time"
) )
const IntegerSizeInBytes = 8 const IntegerSizeInBytes = 8
const StreamStartingCapacity = 512 const StreamStartingCapacity = 512
type StoredEvent struct { type StoredEvent struct {
StreamId uuid.UUID StreamId uuid.UUID
CreationTime time.Time CreationTime time.Time
TypeId string TypeId string
Data []byte Data []byte
MetadataTypeId string MetadataTypeId string
Metadata []byte Metadata []byte
} }
//TODO: performance - change reads array for some kind of iterator //TODO: performance - change reads array for some kind of iterator
type Storage interface { type Storage interface {
Write(event *StoredEvent) error Write(event *StoredEvent) error
ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error)
ReadAll() ([]*StoredEvent, error) ReadAll() ([]*StoredEvent, error)
StreamVersion(streamId uuid.UUID) (uint32, error) StreamVersion(streamId uuid.UUID) (uint32, error)
RebuildTypeIndexes() RebuildTypeIndexes()
} RebuildIndexes()
}