diff --git a/goes.go b/goes.go index 84ae5e3..4527b66 100644 --- a/goes.go +++ b/goes.go @@ -14,6 +14,7 @@ import ( var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to") var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage") var buildTypeIndexes = flag.Bool("buildTypeIndexes", false, "Build type indexes") +var buildIndexes = flag.Bool("buildIndexes", false, "Build indexes") func PathIsAbsolute(s string) bool { if len(s) > 1 && s[1] == ':' { @@ -36,6 +37,10 @@ func main() { } diskStorage := storage.NewDailyDiskStorage(storagePath) + if *buildIndexes { + diskStorage.RebuildIndexes() + return + } if *buildTypeIndexes { diskStorage.RebuildTypeIndexes() return diff --git a/storage/dailydisk.go b/storage/dailydisk.go index c499da0..3790bb6 100644 --- a/storage/dailydisk.go +++ b/storage/dailydisk.go @@ -1,280 +1,381 @@ -package storage - -import ( - "path" - "os" - "time" - "github.com/satori/go.uuid" - "fmt" - "errors" - "io/ioutil" - "bytes" -) - -const EMPTY_STREAM = uint32(0) -var CRLF = []byte("\r\n") - -type DailyDiskStorage struct { - storagePath string - indexesPath string - typesIndexesPath string - globalIndexFilename string -} - -func NewDailyDiskStorage(storagePath string) Storage { - fmt.Println("Using DailyDiskStorage path:", storagePath) - indexesPath := path.Join(storagePath, "indexes") - globalIndexPath := path.Join(indexesPath, "global") - typesIndexesPath := path.Join(indexesPath, "types") - if err := os.MkdirAll(typesIndexesPath, 0777); err != nil { - panic(err) - } - return &DailyDiskStorage{storagePath, indexesPath, typesIndexesPath, globalIndexPath}; -} - -func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { - return path.Join(me.indexesPath, streamId.String()) -} - -func (me DailyDiskStorage) getEventFilename(creationTime time.Time, typeId string) string { - yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month()) - day := fmt.Sprintf("%02d", creationTime.Day()) - eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId) - return path.Join(me.storagePath, yearMonth, day, eventFilename) -} - -type IndexEntry struct { - streamId uuid.UUID - creationTime time.Time - typeId string -} - -func appendIndex(filename string, entry *IndexEntry) error { - indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) - if err != nil { - return err - } - defer indexFile.Close() - - written, err := indexFile.Write(entry.streamId.Bytes()) - if err != nil { - return err - } - if written != 16 { - return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", 16, written)) - } - - creationTimeBytes, err := entry.creationTime.MarshalBinary() - if err != nil { - return err - } - writeSizeAndBytes(indexFile, creationTimeBytes) - writeSizeAndBytes(indexFile, []byte(entry.typeId)) - - return nil -} - -func (me DailyDiskStorage) appendTypeIndex(entry *IndexEntry) error { - filename := path.Join(me.typesIndexesPath, entry.typeId) - indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644 ) - if err != nil { - return err - } - defer indexFile.Close() - - value := me.getEventFilename(entry.creationTime, entry.typeId) - start := len(me.storagePath) + 1 - _, err = indexFile.WriteString(value[start:] + "\r\n") - - return err -} - -func readIndexNextEntry(f *os.File) (*IndexEntry, error) { - index := IndexEntry{} - - uuidBytes := make([]byte, 16) - read, err := f.Read(uuidBytes) - if err != nil { - return nil, err - } - if read != 16 { - return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %v bytes, got only %v bytes.", 16, read)) - } - index.streamId = uuid.FromBytesOrNil(uuidBytes) - - creationTimeBytes, err := readSizedBytes(f) - if err != nil { - return nil, err - } - if err = index.creationTime.UnmarshalBinary(creationTimeBytes); err != nil { - return nil, err - } - - typeIdBytes, err := readSizedBytes(f) - index.typeId = string(typeIdBytes) - - return &index, nil; -} - -func writeEvent(filename string, data []byte, metadata []byte) error { - eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) - if err != nil { - return err - } - defer eventFile.Close() - - eventFile.Write(data) - eventFile.Write(CRLF) - eventFile.Write(metadata) - - return nil -} - -func readEvent(filename string) (data []byte, metadata []byte, err error) { - content, err := ioutil.ReadFile(filename) - if err != nil { - return - } - sep := bytes.Index(content, CRLF) - if sep == -1 { - data = content - metadata = make([]byte, 0) - return - } - data = content[:sep] - metadata = content[sep+2:] - return -} - -func (me DailyDiskStorage) Write(event *StoredEvent) error { - - eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) - os.MkdirAll(path.Dir(eventFilename), 0777) - - err := writeEvent(eventFilename, event.Data, event.Metadata) - if err != nil { - return err - } - - index := &IndexEntry{event.StreamId, event.CreationTime, event.TypeId} - - err = appendIndex(me.globalIndexFilename, index) - if err != nil { - return err - } - - err = appendIndex(me.getStreamIndexFilename(event.StreamId), index) - if err != nil { - return err - } - - return me.appendTypeIndex(index) -} - -func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { - indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) - if err != nil { - return EMPTY_STREAM, errors.New("NOT_FOUND: " + err.Error()) - } - defer indexFile.Close() - - ver := EMPTY_STREAM - for { - _, err := readIndexNextEntry(indexFile) - if err != nil && err.Error() == "EOF" { - break - } - if err != nil { - return EMPTY_STREAM, err - } - ver++ - } - - return ver, nil -} - -func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { - - indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) - if err != nil { - return nil, errors.New("NOT_FOUND: " + err.Error()) - } - defer indexFile.Close() - - events := make([]*StoredEvent, 0) - for { - indexEntry, err := readIndexNextEntry(indexFile) - if err != nil && err.Error() == "EOF" { - break - } - if err != nil { - return nil, err - } - data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) - if err != nil { - return nil, err - } - event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} - events = append(events, event) - } - - return events, nil -} - -func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) { - indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0) - if err != nil { - return nil, err - } - defer indexFile.Close() - - events := make([]*StoredEvent, 0) - for { - indexEntry, err := readIndexNextEntry(indexFile) - if err != nil && err.Error() == "EOF" { - break - } - if err != nil { - return nil, err - } - data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) - if err != nil { - return nil, err - } - event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} - events = append(events, event) - } - - return events, nil -} - -func (me DailyDiskStorage) RebuildTypeIndexes() { - fmt.Print("Rebuilding type indexes... ") - - err := os.RemoveAll(me.typesIndexesPath) - if err != nil { - panic(err) - } - err = os.MkdirAll(me.typesIndexesPath, 0777) - if err != nil { - panic(err) - } - - globalIndexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0644) - if err != nil { - panic(err) - } - - for { - indexEntry, err := readIndexNextEntry(globalIndexFile) - if err != nil && err.Error() == "EOF" { - break - } - if err != nil { - panic(err) - } - me.appendTypeIndex(indexEntry) - } - - fmt.Println("Done.") +package storage + +import ( + "path" + "os" + "time" + "github.com/satori/go.uuid" + "fmt" + "errors" + "io/ioutil" + "bytes" + "path/filepath" + "encoding/json" + "strings" + "strconv" +) + +const EMPTY_STREAM = uint32(0) +var CRLF = []byte("\r\n") + +type DailyDiskStorage struct { + storagePath string + indexesPath string + typesIndexesPath string + globalIndexFilename string +} + +func NewDailyDiskStorage(storagePath string) Storage { + fmt.Println("Using DailyDiskStorage path:", storagePath) + indexesPath := path.Join(storagePath, "indexes") + globalIndexPath := path.Join(indexesPath, "global") + typesIndexesPath := path.Join(indexesPath, "types") + if err := os.MkdirAll(typesIndexesPath, 0777); err != nil { + panic(err) + } + return &DailyDiskStorage{storagePath, indexesPath, typesIndexesPath, globalIndexPath}; +} + +func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { + return path.Join(me.indexesPath, streamId.String()) +} + +func (me DailyDiskStorage) getEventFilename(creationTime time.Time, typeId string) string { + yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month()) + day := fmt.Sprintf("%02d", creationTime.Day()) + eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId) + return path.Join(me.storagePath, yearMonth, day, eventFilename) +} + +type IndexEntry struct { + streamId uuid.UUID + creationTime time.Time + typeId string +} + +func appendIndex(filename string, entry *IndexEntry) error { + indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) + if err != nil { + return err + } + defer indexFile.Close() + + written, err := indexFile.Write(entry.streamId.Bytes()) + if err != nil { + return err + } + if written != 16 { + return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", 16, written)) + } + + creationTimeBytes, err := entry.creationTime.MarshalBinary() + if err != nil { + return err + } + writeSizeAndBytes(indexFile, creationTimeBytes) + writeSizeAndBytes(indexFile, []byte(entry.typeId)) + + return nil +} + +func (me DailyDiskStorage) appendTypeIndex(entry *IndexEntry) error { + filename := path.Join(me.typesIndexesPath, entry.typeId) + indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644 ) + if err != nil { + return err + } + defer indexFile.Close() + + value := me.getEventFilename(entry.creationTime, entry.typeId) + start := len(me.storagePath) + 1 + _, err = indexFile.WriteString(value[start:] + "\r\n") + + return err +} + +func readIndexNextEntry(f *os.File) (*IndexEntry, error) { + index := IndexEntry{} + + uuidBytes := make([]byte, 16) + read, err := f.Read(uuidBytes) + if err != nil { + return nil, err + } + if read != 16 { + return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %v bytes, got only %v bytes.", 16, read)) + } + index.streamId = uuid.FromBytesOrNil(uuidBytes) + + creationTimeBytes, err := readSizedBytes(f) + if err != nil { + return nil, err + } + if err = index.creationTime.UnmarshalBinary(creationTimeBytes); err != nil { + return nil, err + } + + typeIdBytes, err := readSizedBytes(f) + index.typeId = string(typeIdBytes) + + return &index, nil; +} + +func writeEvent(filename string, data []byte, metadata []byte) error { + eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) + if err != nil { + return err + } + defer eventFile.Close() + + eventFile.Write(data) + eventFile.Write(CRLF) + eventFile.Write(metadata) + + return nil +} + +func readEvent(filename string) (data []byte, metadata []byte, err error) { + content, err := ioutil.ReadFile(filename) + if err != nil { + return + } + sep := bytes.Index(content, CRLF) + if sep == -1 { + data = content + metadata = make([]byte, 0) + return + } + data = content[:sep] + metadata = content[sep+2:] + return +} + +func (me DailyDiskStorage) Write(event *StoredEvent) error { + + eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) + os.MkdirAll(path.Dir(eventFilename), 0777) + + err := writeEvent(eventFilename, event.Data, event.Metadata) + if err != nil { + return err + } + + index := &IndexEntry{event.StreamId, event.CreationTime, event.TypeId} + + err = appendIndex(me.globalIndexFilename, index) + if err != nil { + return err + } + + err = appendIndex(me.getStreamIndexFilename(event.StreamId), index) + if err != nil { + return err + } + + return me.appendTypeIndex(index) +} + +func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { + indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) + if err != nil { + return EMPTY_STREAM, errors.New("NOT_FOUND: " + err.Error()) + } + defer indexFile.Close() + + ver := EMPTY_STREAM + for { + _, err := readIndexNextEntry(indexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + return EMPTY_STREAM, err + } + ver++ + } + + return ver, nil +} + +func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { + + indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) + if err != nil { + return nil, errors.New("NOT_FOUND: " + err.Error()) + } + defer indexFile.Close() + + events := make([]*StoredEvent, 0) + for { + indexEntry, err := readIndexNextEntry(indexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + return nil, err + } + data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) + if err != nil { + return nil, err + } + event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} + events = append(events, event) + } + + return events, nil +} + +func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) { + indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer indexFile.Close() + + events := make([]*StoredEvent, 0) + for { + indexEntry, err := readIndexNextEntry(indexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + return nil, err + } + data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) + if err != nil { + return nil, err + } + event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} + events = append(events, event) + } + + return events, nil +} + +func (me DailyDiskStorage) RebuildTypeIndexes() { + fmt.Print("Rebuilding type indexes... ") + + err := os.RemoveAll(me.typesIndexesPath) + if err != nil { + panic(err) + } + err = os.MkdirAll(me.typesIndexesPath, 0777) + if err != nil { + panic(err) + } + + globalIndexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0644) + if err != nil { + panic(err) + } + + for { + indexEntry, err := readIndexNextEntry(globalIndexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + panic(err) + } + me.appendTypeIndex(indexEntry) + } + + fmt.Println("Done.") +} + +func (me DailyDiskStorage) RebuildIndexes() { + fmt.Print("Rebuilding indexes..."); + + err := os.RemoveAll(me.indexesPath) + if err != nil { + panic(err) + } + err = os.MkdirAll(me.typesIndexesPath, 0777) + if err != nil { + panic(err) + } + + fileList := make([]string, 0) + err = filepath.Walk(me.storagePath, func(path string, f os.FileInfo, err error) error { + if !f.IsDir() { + fileList = append(fileList, path) + } + return nil + }) + + for _, file := range fileList { + event := map[string]interface{} {} + if content, err := ioutil.ReadFile(file); err != nil { + panic(err) + } else if parts := bytes.Split(content, CRLF); len(parts) == 0 { + panic(errors.New(fmt.Sprintf("Empty event file %s", file))) + } else if err := json.Unmarshal(parts[0], &event); err != nil { + panic(err) + } else { + firstKey := string(bytes.Split(parts[0], []byte("\""))[1]) + streamId := event[firstKey].(string) + streamUuid, err := uuid.FromString(streamId) + if err != nil { + panic(err) + } + + if file[0:len(me.storagePath)] == me.storagePath { + file = file[len(me.storagePath)+1:] + } + file = strings.Replace(file, "/", "", -1) + parts := strings.Split(file, "_") + typeId := parts[1] + year, err := strconv.Atoi(file[0:4]) + if err != nil { + panic(err) + } + month, err := strconv.Atoi(file[4:6]) + if err != nil { + panic(err) + } + day, err := strconv.Atoi(file[6:8]) + if err != nil { + panic(err) + } + hour, err := strconv.Atoi(file[8:10]) + if err != nil { + panic(err) + } + min, err := strconv.Atoi(file[10:12]) + if err != nil { + panic(err) + } + sec, err := strconv.Atoi(file[12:14]) + if err != nil { + panic(err) + } + nsec, err := strconv.Atoi(file[14:23]) + if err != nil { + panic(err) + } + loc, err := time.LoadLocation("Local") + if err != nil { + panic(err) + } + creationTime := time.Date(year, time.Month(month), day, hour, min, sec, nsec, loc) + fmt.Printf("%s=%s %d %d %d %d %d %d %d %s\n", firstKey, streamId, year, month, day, hour, min, sec, nsec, typeId) + + index := &IndexEntry{streamUuid, creationTime, typeId} + + err = appendIndex(me.globalIndexFilename, index) + if err != nil { + panic(err) + } + + err = appendIndex(me.getStreamIndexFilename(streamUuid), index) + if err != nil { + panic(err) + } + + err = me.appendTypeIndex(index) + if err != nil { + panic(err) + } + } + } } \ No newline at end of file diff --git a/storage/simpledisk.go b/storage/simpledisk.go index 114e3cc..002527c 100644 --- a/storage/simpledisk.go +++ b/storage/simpledisk.go @@ -1,234 +1,238 @@ -package storage - -import ( - "encoding/binary" - "github.com/satori/go.uuid" - "os" - "time" - "path" - "fmt" - "errors" -) - -func NewSimpleDiskStorage(storagePath string) Storage { - return &SimpleDiskStorage{storagePath, path.Join(storagePath, "eventindex")} -} - -type SimpleDiskStorage struct { - storagePath string - indexPath string -} - -func (me SimpleDiskStorage) getFilename(stream, extension string) string { - return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension) -} - -func (me SimpleDiskStorage) GetFilenameForEvents(stream string) string { - return me.getFilename(stream, ".history") -} - -func writeSizeAndBytes(f *os.File, data []byte) (error) { - sizeBytes := make([]byte, IntegerSizeInBytes) - size := len(data) - binary.BigEndian.PutUint64(sizeBytes, uint64(size)) - - written, err := f.Write(sizeBytes) - if err != nil { - return err - } - if written != IntegerSizeInBytes { - return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", IntegerSizeInBytes, written)) - } - - written, err = f.Write(data) - if err != nil { - return err - } - if written != size { - return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", size, written)) - } - - return nil -} - -func readSizedBytes(f *os.File) ([]byte, error) { - sizeBytes := make([]byte, IntegerSizeInBytes) - read, err := f.Read(sizeBytes) - if err != nil { - return nil, err - } - if read != IntegerSizeInBytes { - return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %d bytes, got %d bytes.", IntegerSizeInBytes, read)) - } - size := binary.BigEndian.Uint64(sizeBytes) - data := make([]byte, size) - read, err = f.Read(data) - if err != nil { - return nil, err - } - if uint64(read) != size { - return nil, errors.New(fmt.Sprintf("Integrity error. Expected to ready %d bytes, got %d bytes.", IntegerSizeInBytes, read)) - } - - return data, nil -} - -func (me SimpleDiskStorage) Write(event *StoredEvent) error { - filename := me.GetFilenameForEvents(event.StreamId.String()) - os.MkdirAll(path.Dir(filename), os.ModeDir) - - indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) - if err != nil { - return err - } - defer indexFile.Close() - - eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) - if err != nil { - return err - } - defer eventsFile.Close() - - stat, err := eventsFile.Stat() - if err != nil { - return err - } - position := stat.Size() - - creationTimeBytes, err := event.CreationTime.MarshalBinary() - if err != nil { - return err - } - writeSizeAndBytes(eventsFile, creationTimeBytes) - writeSizeAndBytes(eventsFile, []byte(event.TypeId)) - writeSizeAndBytes(eventsFile, event.Data) - - indexFile.Write(event.StreamId.Bytes()) - positionBytes := make([]byte, IntegerSizeInBytes) - binary.BigEndian.PutUint64(positionBytes, uint64(position)) - indexFile.Write(positionBytes) - - return nil -} - -func (me SimpleDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { - //TODO - return EMPTY_STREAM, nil -} - -func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { - streamName := streamId.String() - offset := int64(0) //TODO snapshots - filename := me.GetFilenameForEvents(streamName) - - eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) - if err != nil { - return nil, err - } - defer eventsFile.Close() - - eventsFile.Seek(offset, 0) - - results := make([]*StoredEvent, 0) - for { - creationTime, typeId, data, err := getStoredData(eventsFile) - if err != nil && err.Error() == "EOF" { - break - } - if err != nil { - return nil, err - } - - //TODO metadata - event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} - results = append(results, event) - } - return results, nil -} - -func (me SimpleDiskStorage) ReadAll() ([]*StoredEvent, error) { - indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0) - if err != nil { - return nil, err - } - defer indexFile.Close() - - results := make([]*StoredEvent, 0) - guidBytes := make([]byte, 16) - offsetBytes := make([]byte, IntegerSizeInBytes) - for { - read, err := indexFile.Read(guidBytes) - if err != nil { - break - } - if read != 16 { - return nil, errors.New("index integrity error") - } - read, err = indexFile.Read(offsetBytes) - if err != nil { - return nil, err - } - if read != IntegerSizeInBytes { - return nil, errors.New("index integrity error") - } - aggregateId, err := uuid.FromBytes(guidBytes) - if err != nil { - return nil, err - } - offset := binary.BigEndian.Uint64(offsetBytes) - - storedEvent, err := me.retrieveStoredEvent(aggregateId, int64(offset)) - if err != nil { - return nil, err - } - results = append(results, storedEvent) - } - - return results, nil -} - -func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) { - filename := me.GetFilenameForEvents(streamId.String()) - - eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) - if err != nil { - return nil, err - } - defer eventsFile.Close() - - eventsFile.Seek(offset, 0) - - creationTime, typeId, data, err := getStoredData(eventsFile) - if err != nil { - return nil, err - } - - //TODO metadata - event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} - return event, nil -} - -func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data []byte, err error) { - creationTimeBytes, err := readSizedBytes(eventsFile) - if err != nil { - return - } - err = creationTime.UnmarshalBinary(creationTimeBytes) - if err != nil { - return - } - - typeIdBytes, err := readSizedBytes(eventsFile) - if err != nil { - return - } - typeId = string(typeIdBytes) - - data, err = readSizedBytes(eventsFile) - - return -} - -func (me SimpleDiskStorage) RebuildTypeIndexes() { - +package storage + +import ( + "encoding/binary" + "github.com/satori/go.uuid" + "os" + "time" + "path" + "fmt" + "errors" +) + +func NewSimpleDiskStorage(storagePath string) Storage { + return &SimpleDiskStorage{storagePath, path.Join(storagePath, "eventindex")} +} + +type SimpleDiskStorage struct { + storagePath string + indexPath string +} + +func (me SimpleDiskStorage) getFilename(stream, extension string) string { + return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension) +} + +func (me SimpleDiskStorage) GetFilenameForEvents(stream string) string { + return me.getFilename(stream, ".history") +} + +func writeSizeAndBytes(f *os.File, data []byte) (error) { + sizeBytes := make([]byte, IntegerSizeInBytes) + size := len(data) + binary.BigEndian.PutUint64(sizeBytes, uint64(size)) + + written, err := f.Write(sizeBytes) + if err != nil { + return err + } + if written != IntegerSizeInBytes { + return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", IntegerSizeInBytes, written)) + } + + written, err = f.Write(data) + if err != nil { + return err + } + if written != size { + return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", size, written)) + } + + return nil +} + +func readSizedBytes(f *os.File) ([]byte, error) { + sizeBytes := make([]byte, IntegerSizeInBytes) + read, err := f.Read(sizeBytes) + if err != nil { + return nil, err + } + if read != IntegerSizeInBytes { + return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %d bytes, got %d bytes.", IntegerSizeInBytes, read)) + } + size := binary.BigEndian.Uint64(sizeBytes) + data := make([]byte, size) + read, err = f.Read(data) + if err != nil { + return nil, err + } + if uint64(read) != size { + return nil, errors.New(fmt.Sprintf("Integrity error. Expected to ready %d bytes, got %d bytes.", IntegerSizeInBytes, read)) + } + + return data, nil +} + +func (me SimpleDiskStorage) Write(event *StoredEvent) error { + filename := me.GetFilenameForEvents(event.StreamId.String()) + os.MkdirAll(path.Dir(filename), os.ModeDir) + + indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) + if err != nil { + return err + } + defer indexFile.Close() + + eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) + if err != nil { + return err + } + defer eventsFile.Close() + + stat, err := eventsFile.Stat() + if err != nil { + return err + } + position := stat.Size() + + creationTimeBytes, err := event.CreationTime.MarshalBinary() + if err != nil { + return err + } + writeSizeAndBytes(eventsFile, creationTimeBytes) + writeSizeAndBytes(eventsFile, []byte(event.TypeId)) + writeSizeAndBytes(eventsFile, event.Data) + + indexFile.Write(event.StreamId.Bytes()) + positionBytes := make([]byte, IntegerSizeInBytes) + binary.BigEndian.PutUint64(positionBytes, uint64(position)) + indexFile.Write(positionBytes) + + return nil +} + +func (me SimpleDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { + //TODO + return EMPTY_STREAM, nil +} + +func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { + streamName := streamId.String() + offset := int64(0) //TODO snapshots + filename := me.GetFilenameForEvents(streamName) + + eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer eventsFile.Close() + + eventsFile.Seek(offset, 0) + + results := make([]*StoredEvent, 0) + for { + creationTime, typeId, data, err := getStoredData(eventsFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + return nil, err + } + + //TODO metadata + event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} + results = append(results, event) + } + return results, nil +} + +func (me SimpleDiskStorage) ReadAll() ([]*StoredEvent, error) { + indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer indexFile.Close() + + results := make([]*StoredEvent, 0) + guidBytes := make([]byte, 16) + offsetBytes := make([]byte, IntegerSizeInBytes) + for { + read, err := indexFile.Read(guidBytes) + if err != nil { + break + } + if read != 16 { + return nil, errors.New("index integrity error") + } + read, err = indexFile.Read(offsetBytes) + if err != nil { + return nil, err + } + if read != IntegerSizeInBytes { + return nil, errors.New("index integrity error") + } + aggregateId, err := uuid.FromBytes(guidBytes) + if err != nil { + return nil, err + } + offset := binary.BigEndian.Uint64(offsetBytes) + + storedEvent, err := me.retrieveStoredEvent(aggregateId, int64(offset)) + if err != nil { + return nil, err + } + results = append(results, storedEvent) + } + + return results, nil +} + +func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) { + filename := me.GetFilenameForEvents(streamId.String()) + + eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer eventsFile.Close() + + eventsFile.Seek(offset, 0) + + creationTime, typeId, data, err := getStoredData(eventsFile) + if err != nil { + return nil, err + } + + //TODO metadata + event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} + return event, nil +} + +func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data []byte, err error) { + creationTimeBytes, err := readSizedBytes(eventsFile) + if err != nil { + return + } + err = creationTime.UnmarshalBinary(creationTimeBytes) + if err != nil { + return + } + + typeIdBytes, err := readSizedBytes(eventsFile) + if err != nil { + return + } + typeId = string(typeIdBytes) + + data, err = readSizedBytes(eventsFile) + + return +} + +func (me SimpleDiskStorage) RebuildTypeIndexes() { + +} + +func (me SimpleDiskStorage) RebuildIndexes() { + } \ No newline at end of file diff --git a/storage/storage.go b/storage/storage.go index 98be1eb..1eb65ba 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,27 +1,28 @@ -package storage - -import ( - "github.com/satori/go.uuid" - "time" -) - -const IntegerSizeInBytes = 8 -const StreamStartingCapacity = 512 - -type StoredEvent struct { - StreamId uuid.UUID - CreationTime time.Time - TypeId string - Data []byte - MetadataTypeId string - Metadata []byte -} - -//TODO: performance - change reads array for some kind of iterator -type Storage interface { - Write(event *StoredEvent) error - ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) - ReadAll() ([]*StoredEvent, error) - StreamVersion(streamId uuid.UUID) (uint32, error) - RebuildTypeIndexes() -} +package storage + +import ( + "github.com/satori/go.uuid" + "time" +) + +const IntegerSizeInBytes = 8 +const StreamStartingCapacity = 512 + +type StoredEvent struct { + StreamId uuid.UUID + CreationTime time.Time + TypeId string + Data []byte + MetadataTypeId string + Metadata []byte +} + +//TODO: performance - change reads array for some kind of iterator +type Storage interface { + Write(event *StoredEvent) error + ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) + ReadAll() ([]*StoredEvent, error) + StreamVersion(streamId uuid.UUID) (uint32, error) + RebuildTypeIndexes() + RebuildIndexes() +}