From 81fad38cb07ede4d1c7ed7f778066f7ba081e7ad Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Wed, 17 Feb 2016 18:15:40 -0800 Subject: [PATCH] Adding readable disk storage with minimalistic tests --- goes.go | 11 +- readablediskstorage.go | 189 +++++++++++++++++++++++++++++++++++ readablediskstorage_test.go | 123 +++++++++++++++++++++++ serialization.go | 40 ++------ simpleserver/simpleserver.go | 39 +++++++- storage.go | 124 ++++++++++++++++------- 6 files changed, 452 insertions(+), 74 deletions(-) create mode 100644 readablediskstorage.go create mode 100644 readablediskstorage_test.go diff --git a/goes.go b/goes.go index 475f740..bc54a31 100644 --- a/goes.go +++ b/goes.go @@ -2,6 +2,7 @@ package goes import ( "github.com/satori/go.uuid" + "time" ) var serializer Serializer @@ -48,14 +49,12 @@ func AddEvent(event Event) error { lockStream(streamName) defer unlockStream(streamName) - serializedPayload, err := serializer.Serialize(event.Payload) + serializedPayload, typeId, err := serializer.Serialize(event.Payload) if err != nil { return err } - storage.Write(event.AggregateId, serializedPayload) - - return nil + return storage.Write(&StoredEvent{event.AggregateId, time.Now(), typeId, serializedPayload}) } func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) { @@ -66,7 +65,7 @@ func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) { events := make([]*Event, 0) for _, storedEvent := range results { - event, err := serializer.Deserialize(storedEvent.Data) + event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) if err != nil { return nil, err } @@ -84,7 +83,7 @@ func RetrieveAll() ([]*Event, error) { events := make([]*Event, 0) for _, storedEvent := range results { - event, err := serializer.Deserialize(storedEvent.Data) + event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) if err != nil { return nil, err } diff --git a/readablediskstorage.go b/readablediskstorage.go new file mode 100644 index 0000000..102aac7 --- /dev/null +++ b/readablediskstorage.go @@ -0,0 +1,189 @@ +package goes + +import ( + "path" + "os" + "time" + "github.com/satori/go.uuid" + "fmt" + "errors" + "io/ioutil" +) + +type ReadableDiskStorage struct { + storagePath string + indexesPath string + globalIndexFilename string +} + +func NewReadableDiskStorage(storagePath string) Storage { + indexesPath := path.Join(storagePath, "indexes") + globalIndexPath := path.Join(indexesPath, "global") + os.MkdirAll(indexesPath, 0777) + return &ReadableDiskStorage{storagePath, indexesPath, globalIndexPath}; +} + +func (me ReadableDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { + return path.Join(me.indexesPath, streamId.String()) +} + +func (me ReadableDiskStorage) getEventFilename(creationTime time.Time, typeId string) string { + yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month()) + day := fmt.Sprintf("%02d", creationTime.Day()) + eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId) + return path.Join(me.storagePath, yearMonth, day, eventFilename) +} + +type IndexEntry struct { + streamId uuid.UUID + creationTime time.Time + typeId string +} + +func appendIndex(filename string, entry *IndexEntry) error { + indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) + if err != nil { + return err + } + defer indexFile.Close() + + written, err := indexFile.Write(entry.streamId.Bytes()) + if err != nil { + return err + } + if written != 16 { + return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", 16, written)) + } + + creationTimeBytes, err := entry.creationTime.MarshalBinary() + if err != nil { + return err + } + writeSizeAndBytes(indexFile, creationTimeBytes) + writeSizeAndBytes(indexFile, []byte(entry.typeId)) + + return nil +} + +func readIndexNextEntry(f *os.File) (*IndexEntry, error) { + index := IndexEntry{} + + uuidBytes := make([]byte, 16) + read, err := f.Read(uuidBytes) + if err != nil { + return nil, err + } + if read != 16 { + return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %v bytes, got only %v bytes.", 16, read)) + } + index.streamId = uuid.FromBytesOrNil(uuidBytes) + + creationTimeBytes, err := readSizedBytes(f) + if err != nil { + return nil, err + } + if err = index.creationTime.UnmarshalBinary(creationTimeBytes); err != nil { + return nil, err + } + + typeIdBytes, err := readSizedBytes(f) + index.typeId = string(typeIdBytes) + + return &index, nil; +} + +func writeEvent(filename string, data []byte) error { + eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) + if err != nil { + return err + } + defer eventFile.Close() + + eventFile.Write(data) + + return nil +} + +func readEvent(filename string) ([]byte, error) { + return ioutil.ReadFile(filename) +} + +func (me ReadableDiskStorage) Write(event *StoredEvent) error { + + eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) + os.MkdirAll(path.Dir(eventFilename), 0777) + + err := writeEvent(eventFilename, event.Data) + if err != nil { + return err + } + + index := &IndexEntry{event.StreamId, event.CreationTime, event.TypeId} + + err = appendIndex(me.globalIndexFilename, index) + if err != nil { + return err + } + + err = appendIndex(me.getStreamIndexFilename(event.StreamId), index) + if err != nil { + return err + } + + return nil +} + +func (me ReadableDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { + + indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer indexFile.Close() + + events := make([]*StoredEvent, 0) + for { + indexEntry, err := readIndexNextEntry(indexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + return nil, err + } + data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) + if err != nil { + return nil, err + } + event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data} + events = append(events, event) + } + + return events, nil +} + +func (me ReadableDiskStorage) ReadAll() ([]*StoredEvent, error) { + indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer indexFile.Close() + + events := make([]*StoredEvent, 0) + for { + indexEntry, err := readIndexNextEntry(indexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + return nil, err + } + data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) + if err != nil { + return nil, err + } + event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data} + events = append(events, event) + } + + return events, nil +} \ No newline at end of file diff --git a/readablediskstorage_test.go b/readablediskstorage_test.go new file mode 100644 index 0000000..3adffef --- /dev/null +++ b/readablediskstorage_test.go @@ -0,0 +1,123 @@ +package goes + +import ( + "testing" + "os" + "path" + "github.com/satori/go.uuid" + "time" + "reflect" +) + +func TestAddEvent(t *testing.T) { + //Arrange + storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) + defer os.RemoveAll(storagePath) + storage := NewReadableDiskStorage(storagePath) + + aLocation, _ := time.LoadLocation("") + aTime := time.Date(2016,2,11,9,53,32,1234567, aLocation) + aggregateId := uuid.NewV4() + aType := "myType" + data := []byte("{}") + + //Act + err := storage.Write(&StoredEvent{aggregateId, aTime, aType, data}) + + //Assert + if err != nil { + t.Errorf("Write failed. Error: %v", err) + } + + readableDiskStorage := storage.(*ReadableDiskStorage) + + globalIndexFi, _ := os.Stat(readableDiskStorage.globalIndexFilename) + if globalIndexFi == nil { + t.Error("Write failed. Expected global index file, none exists.") + } + aggregateIndexFi, _ := os.Stat(readableDiskStorage.getStreamIndexFilename(aggregateId)) + if aggregateIndexFi == nil { + t.Errorf("Write failed. Expected index for aggregate %v, none exists.", aggregateId.String()) + } + eventFi, _ := os.Stat(readableDiskStorage.getEventFilename(aTime, aType)) + if eventFi == nil { + t.Errorf("Write failed. Expected file for event %v, none exists.", aggregateId.String()) + } + + //TODO: check indexes/event content +} + +func TestReadStream(t *testing.T) { + //Arrange + storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) + defer os.RemoveAll(storagePath) + storage := NewReadableDiskStorage(storagePath) + + streamId := uuid.NewV4() + ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent")} + storage.Write(ev1) + ev2 := &StoredEvent{streamId, time.Now(), "2ndType", []byte("2ndEvent")} + storage.Write(ev2) + + //Act + storedEvents, err := storage.ReadStream(streamId) + + //Assert + if err != nil { + t.Errorf("ReadStream failed. Error: %v", err) + return + } + if len(storedEvents) != 2 { + t.Errorf("ReadStream failed. Got %v stored events, expected %v", len(storedEvents), 2) + return + } + if !reflect.DeepEqual(storedEvents[0], ev1) { + t.Errorf("ReadStream failed. First event doesn't match. %+v != %+v", storedEvents[0], ev1) + return + } + if !reflect.DeepEqual(storedEvents[1], ev2) { + t.Errorf("ReadStream failed. Second event doesn't match. %+v != %+v", storedEvents[1], ev2) + return + } +} + +func TestReadAll(t *testing.T) { + //Arrange + storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) + defer os.RemoveAll(storagePath) + storage := NewReadableDiskStorage(storagePath) + + stream1Id := uuid.NewV4() + stream2Id := uuid.NewV4() + ev1 := &StoredEvent{stream1Id, time.Now(), "1stType", []byte("1stEvent")} + storage.Write(ev1) + ev2 := &StoredEvent{stream2Id, time.Now(), "2ndType", []byte("2ndEvent")} + storage.Write(ev2) + ev3 := &StoredEvent{stream1Id, time.Now(), "3rdType", []byte("3rdEvent")} + storage.Write(ev3) + + //Act + storedEvents, err := storage.ReadAll() + + //Assert + if err != nil { + t.Errorf("ReadAll failed. Error: %v", err) + return + } + if len(storedEvents) != 3 { + t.Errorf("ReadAll failed. Got %v stored events, expected %v", len(storedEvents), 3) + return + } + if !reflect.DeepEqual(storedEvents[0], ev1) { + t.Errorf("ReadAll failed. First event doesn't match. %+v != %+v", storedEvents[0], ev1) + return + } + if !reflect.DeepEqual(storedEvents[1], ev2) { + t.Errorf("ReadAll failed. Second event doesn't match. %+v != %+v", storedEvents[1], ev2) + return + } + if !reflect.DeepEqual(storedEvents[2], ev3) { + t.Errorf("ReadAll failed. Third event doesn't match. %+v != %+v", storedEvents[2], ev2) + return + } +} diff --git a/serialization.go b/serialization.go index 6efbb45..af018e1 100644 --- a/serialization.go +++ b/serialization.go @@ -4,12 +4,11 @@ import ( "reflect" "encoding/json" "errors" - "bytes" ) type Serializer interface { - Serialize(interface{}) ([]byte, error) - Deserialize([]byte) (interface{}, error) + Serialize(interface{}) ([]byte, string, error) + Deserialize([]byte, string) (interface{}, error) } //TODO: any serializer will require a type registry maybe this should be abstracted @@ -33,52 +32,29 @@ func (me *JsonSerializer) RegisterType(t interface{}) { me.types[type_.String()] = type_ } -func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, error) { +func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, string, error) { type_ := reflect.TypeOf(obj) if (type_.Kind() == reflect.Interface || type_.Kind() == reflect.Ptr) { - return nil, errors.New("Trying to serialize a Ptr type.") + return nil, "", errors.New("Trying to serialize a Ptr type.") } typeId := type_.String() data, err := json.Marshal(obj) if err != nil { - return nil, err + return nil, "", err } - return []byte(typeId + " " + string(data)), nil + return data, typeId, nil } -func (me *JsonSerializer) Deserialize(serialized []byte) (interface{}, error) { - separatorIndex := bytes.Index(serialized, []byte{' '}) - if separatorIndex < 0 { - return nil, errors.New("invalid serialized data") - } - typeId := string(serialized[0:separatorIndex]) +func (me *JsonSerializer) Deserialize(serialized []byte, typeId string) (interface{}, error) { type_ := me.types[typeId] if type_ == nil { return nil, errors.New("type not registered in serializer") } objPtr := reflect.New(type_).Interface() - err := json.Unmarshal(serialized[separatorIndex:], objPtr) + err := json.Unmarshal(serialized, objPtr) if err != nil { return nil, err } obj := reflect.Indirect(reflect.ValueOf(objPtr)).Interface() return obj, nil } - -type PassthruSerializer struct {} - -func NewPassthruSerializer() Serializer { - return &PassthruSerializer{} -} - -func (me PassthruSerializer) Serialize(obj interface{}) ([]byte, error) { - serialized, ok := obj.([]byte) - if !ok { - return nil, errors.New("Object is not a slice of bytes") - } - return serialized, nil -} - -func (me PassthruSerializer) Deserialize(serialized []byte) (interface{}, error) { - return serialized, nil -} diff --git a/simpleserver/simpleserver.go b/simpleserver/simpleserver.go index 1282be2..49dedda 100644 --- a/simpleserver/simpleserver.go +++ b/simpleserver/simpleserver.go @@ -7,8 +7,43 @@ import ( "bitbucket.org/nicdex/adaptech-goes" "os" "path" + "bytes" + "errors" ) +type Serializer struct { +} + +func NewSerializer() goes.Serializer { + return &Serializer{} +} + +func (me Serializer) Serialize(input interface{}) (output []byte, typeId string, err error) { + content, ok := input.([]byte) + if !ok { + err = errors.New("input should be []byte") + return + } + + sep := bytes.IndexByte(content, ' ') + if sep == -1 { + err = errors.New("missing split char.") + return + } + + output = content[sep+1:] + typeId = string(content[0:sep]) + return +} + +func (me Serializer) Deserialize(input []byte, typeId string) (interface{}, error) { + output := []byte(typeId) + output = append(output, ' ') + output = append(output, input...) + + return output, nil +} + func main() { fmt.Println("Simple ZeroMQ server for goes.") @@ -16,8 +51,8 @@ func main() { storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) storagePath = "c:\\dev\\go\\events" - goes.SetStorage(goes.NewDiskStorage(storagePath)) - goes.SetSerializer(goes.NewPassthruSerializer()) + goes.SetStorage(goes.NewReadableDiskStorage(storagePath)) + goes.SetSerializer(NewSerializer()) context, err := zmq4.NewContext() if err != nil { diff --git a/storage.go b/storage.go index de0a60a..336bec0 100644 --- a/storage.go +++ b/storage.go @@ -7,6 +7,7 @@ import ( "github.com/satori/go.uuid" "fmt" "errors" + "time" ) const IntegerSizeInBytes = 8 @@ -14,12 +15,14 @@ const StreamStartingCapacity = 512 type StoredEvent struct { StreamId uuid.UUID + CreationTime time.Time + TypeId string Data []byte } //TODO: performance - change reads array for some kind of iterator type Storage interface { - Write(streamId uuid.UUID, data []byte) error + Write(event *StoredEvent) error ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) ReadAll() ([]*StoredEvent, error) } @@ -41,8 +44,54 @@ func (me DiskStorage) getFilenameForEvents(stream string) string { return me.getFilename(stream, ".history") } -func (me DiskStorage) Write(streamId uuid.UUID, data []byte) error { - filename := me.getFilenameForEvents(streamId.String()) +func writeSizeAndBytes(f *os.File, data []byte) (error) { + sizeBytes := make([]byte, IntegerSizeInBytes) + size := len(data) + binary.BigEndian.PutUint64(sizeBytes, uint64(size)) + + written, err := f.Write(sizeBytes) + if err != nil { + return err + } + if written != IntegerSizeInBytes { + return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", IntegerSizeInBytes, written)) + } + + written, err = f.Write(data) + if err != nil { + return err + } + if written != size { + return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", size, written)) + } + + return nil +} + +func readSizedBytes(f *os.File) ([]byte, error) { + sizeBytes := make([]byte, IntegerSizeInBytes) + read, err := f.Read(sizeBytes) + if err != nil { + return nil, err + } + if read != IntegerSizeInBytes { + return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %d bytes, got %d bytes.", IntegerSizeInBytes, read)) + } + size := binary.BigEndian.Uint64(sizeBytes) + data := make([]byte, size) + read, err = f.Read(data) + if err != nil { + return nil, err + } + if uint64(read) != size { + return nil, errors.New(fmt.Sprintf("Integrity error. Expected to ready %d bytes, got %d bytes.", IntegerSizeInBytes, read)) + } + + return data, nil +} + +func (me DiskStorage) Write(event *StoredEvent) error { + filename := me.getFilenameForEvents(event.StreamId.String()) os.MkdirAll(path.Dir(filename), os.ModeDir) indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) @@ -63,12 +112,15 @@ func (me DiskStorage) Write(streamId uuid.UUID, data []byte) error { } position := stat.Size() - lengthBytes := make([]byte, IntegerSizeInBytes) - binary.BigEndian.PutUint64(lengthBytes, uint64(len(data))) - eventsFile.Write(lengthBytes) - eventsFile.Write(data) + creationTimeBytes, err := event.CreationTime.MarshalBinary() + if err != nil { + return err + } + writeSizeAndBytes(eventsFile, creationTimeBytes) + writeSizeAndBytes(eventsFile, []byte(event.TypeId)) + writeSizeAndBytes(eventsFile, event.Data) - indexFile.Write(streamId.Bytes()) + indexFile.Write(event.StreamId.Bytes()) positionBytes := make([]byte, IntegerSizeInBytes) binary.BigEndian.PutUint64(positionBytes, uint64(position)) indexFile.Write(positionBytes) @@ -89,21 +141,18 @@ func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { eventsFile.Seek(offset, 0) - contentLengthBytes := make([]byte, IntegerSizeInBytes) results := make([]*StoredEvent, 0) for { - read, err := eventsFile.Read(contentLengthBytes) - if err != nil { + creationTime, typeId, data, err := getStoredData(eventsFile) + if err != nil && err.Error() == "EOF" { break } - if read != IntegerSizeInBytes { - return nil, errors.New("event index integrity error") - } - data, err := getStoredData(eventsFile, contentLengthBytes) if err != nil { return nil, err } - results = append(results, &StoredEvent{streamId, data}) + + event := &StoredEvent{streamId, creationTime, typeId, data} + results = append(results, event) } return results, nil } @@ -139,18 +188,18 @@ func (me DiskStorage) ReadAll() ([]*StoredEvent, error) { } offset := binary.BigEndian.Uint64(offsetBytes) - data, err := me.retrieveData(aggregateId, int64(offset)) + storedEvent, err := me.retrieveStoredEvent(aggregateId, int64(offset)) if err != nil { return nil, err } - results = append(results, &StoredEvent{aggregateId, data}) + results = append(results, storedEvent) } return results, nil } -func (me DiskStorage) retrieveData(aggregateId uuid.UUID, offset int64) ([]byte, error) { - filename := me.getFilenameForEvents(aggregateId.String()) +func (me DiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) { + filename := me.getFilenameForEvents(streamId.String()) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) if err != nil { @@ -159,26 +208,33 @@ func (me DiskStorage) retrieveData(aggregateId uuid.UUID, offset int64) ([]byte, defer eventsFile.Close() eventsFile.Seek(offset, 0) - contentLengthBytes := make([]byte, IntegerSizeInBytes) - read, err := eventsFile.Read(contentLengthBytes) + + creationTime, typeId, data, err := getStoredData(eventsFile) if err != nil { return nil, err } - if read < IntegerSizeInBytes { - return nil, errors.New("event integrity problem") - } - return getStoredData(eventsFile, contentLengthBytes) + + event := &StoredEvent{streamId, creationTime, typeId, data} + return event, nil } -func getStoredData(eventsFile *os.File, contentLengthBytes []byte) ([]byte, error) { - contentLength := binary.BigEndian.Uint64(contentLengthBytes) - data := make([]byte, contentLength) - read, err := eventsFile.Read(data) +func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data []byte, err error) { + creationTimeBytes, err := readSizedBytes(eventsFile) if err != nil { - return nil, err + return } - if uint64(read) < contentLength { - return nil, errors.New("incomplete event information retrieved") + err = creationTime.UnmarshalBinary(creationTimeBytes) + if err != nil { + return } - return data, nil + + typeIdBytes, err := readSizedBytes(eventsFile) + if err != nil { + return + } + typeId = string(typeIdBytes) + + data, err = readSizedBytes(eventsFile) + + return } \ No newline at end of file