From 0a6a5913402bacbf1f8376ccbf2bd17055ed0124 Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Mon, 15 Feb 2016 10:09:10 -0800 Subject: [PATCH] Abstracted storage --- goes.go | 156 ++++------------------------------------ goes_test.go | 3 +- serialization.go | 2 + storage.go | 184 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 203 insertions(+), 142 deletions(-) create mode 100644 storage.go diff --git a/goes.go b/goes.go index e0e0a85..475f740 100644 --- a/goes.go +++ b/goes.go @@ -2,40 +2,24 @@ package goes import ( "github.com/satori/go.uuid" - "os" - "fmt" - "encoding/binary" - "path" - "errors" ) -var storagePath string var serializer Serializer - -const IntegerSizeInBytes = 8 -const StreamStartingCapacity = 512 +var storage Storage type Event struct { AggregateId uuid.UUID Payload interface{} } -func SetStoragePath(newStoragePath string) { - storagePath = newStoragePath +func SetStorage(newStorage Storage) { + storage = newStorage } func SetSerializer(newSerializer Serializer) { serializer = newSerializer } -func getFilename(stream, extension string) string { - return fmt.Sprintf("%v%v", path.Join(storagePath, stream[0:2], stream[2:]), extension) -} - -func getFilenameForEvents(stream string) string { - return getFilename(stream, ".history") -} - var mapLock chan int = make(chan int, 1) var streamsLock map[string]chan int = make(map[string]chan int) @@ -64,157 +48,47 @@ func AddEvent(event Event) error { lockStream(streamName) defer unlockStream(streamName) - filename := getFilenameForEvents(streamName) - os.MkdirAll(path.Dir(filename), os.ModeDir) - - eventIndexPath := path.Join(storagePath, "eventindex") - indexFile, err := os.OpenFile(eventIndexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) - if err != nil { - return err - } - defer indexFile.Close() - - eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) - if err != nil { - return err - } - defer eventsFile.Close() - - stat, err := eventsFile.Stat() - if err != nil { - return err - } - position := stat.Size() - serializedPayload, err := serializer.Serialize(event.Payload) if err != nil { return err } - lengthBytes := make([]byte, IntegerSizeInBytes) - binary.BigEndian.PutUint64(lengthBytes, uint64(len(serializedPayload))) - eventsFile.Write(lengthBytes) - eventsFile.Write(serializedPayload) - - indexFile.Write(event.AggregateId.Bytes()) - positionBytes := make([]byte, IntegerSizeInBytes) - binary.BigEndian.PutUint64(positionBytes, uint64(position)) - indexFile.Write(positionBytes) + storage.Write(event.AggregateId, serializedPayload) return nil } -func RetrieveFor(aggregateId uuid.UUID) ([]Event, error) { - streamName := aggregateId.String() - offset := getStartingIndexFor(streamName) - filename := getFilenameForEvents(streamName) - - eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) +func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) { + results, err := storage.ReadStream(aggregateId) if err != nil { return nil, err } - defer eventsFile.Close() - eventsFile.Seek(offset, 0) - - contentLengthBytes := make([]byte, IntegerSizeInBytes) - events := make([]Event, 0) - for { - read, err := eventsFile.Read(contentLengthBytes) - if err != nil { - break - } - if read < 8 { - return nil, errors.New("event index integrity error") - } - event, err := getStoredEvent(eventsFile, contentLengthBytes) + events := make([]*Event, 0) + for _, storedEvent := range results { + event, err := serializer.Deserialize(storedEvent.Data) if err != nil { return nil, err } - events = append(events, Event{aggregateId, event}) + events = append(events, &Event{storedEvent.StreamId, event}) } + return events, nil } -func getStartingIndexFor(streamName string) int64 { - //TODO: snapshots - return int64(0) -} - -func getStoredEvent(eventsFile *os.File, contentLengthBytes []byte) (interface{}, error) { - contentLength := binary.BigEndian.Uint64(contentLengthBytes) - content := make([]byte, contentLength) - read, err := eventsFile.Read(content) - if err != nil { - return nil, err - } - if uint64(read) < contentLength { - return nil, errors.New("incomplete event information retrieved") - } - return serializer.Deserialize(content) -} - -func retrieveEvent(aggregateId uuid.UUID, offset int64) (*Event, error) { - filename := getFilenameForEvents(aggregateId.String()) - - eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) - if err != nil { - return nil, err - } - defer eventsFile.Close() - - eventsFile.Seek(offset, 0) - contentLengthBytes := make([]byte, IntegerSizeInBytes) - read, err := eventsFile.Read(contentLengthBytes) - if err != nil { - return nil, err - } - if read < IntegerSizeInBytes { - return nil, errors.New("event integrity problem") - } - content, err := getStoredEvent(eventsFile, contentLengthBytes) - if err != nil { - return nil, err - } - return &Event{aggregateId, content}, nil -} - func RetrieveAll() ([]*Event, error) { - indexFile, err := os.OpenFile(path.Join(storagePath, "eventindex"), os.O_RDONLY, 0) + results, err := storage.ReadAll() if err != nil { return nil, err } - defer indexFile.Close() events := make([]*Event, 0) - guidBytes := make([]byte, 16) - offsetBytes := make([]byte, IntegerSizeInBytes) - for { - read, err := indexFile.Read(guidBytes) - if err != nil { - break - } - if read != 16 { - return nil, errors.New("index integrity error") - } - read, err = indexFile.Read(offsetBytes) + for _, storedEvent := range results { + event, err := serializer.Deserialize(storedEvent.Data) if err != nil { return nil, err } - if read != IntegerSizeInBytes { - return nil, errors.New("index integrity error") - } - aggregateId, err := uuid.FromBytes(guidBytes) - if err != nil { - return nil, err - } - offset := binary.BigEndian.Uint64(offsetBytes) - - event, err := retrieveEvent(aggregateId, int64(offset)) - if err != nil { - return nil, err - } - events = append(events, event) + events = append(events, &Event{storedEvent.StreamId, event}) } return events, nil diff --git a/goes_test.go b/goes_test.go index 9bdff87..34ee785 100644 --- a/goes_test.go +++ b/goes_test.go @@ -27,7 +27,8 @@ type AnotherEvent struct { func setUp() { tempDir := path.Join(os.TempDir(), uuid.NewV4().String()) - SetStoragePath(tempDir) + storage := NewDiskStorage(tempDir) + SetStorage(storage) serializer := NewJsonSerializer((*MyEvent)(nil), (*AnotherEvent)(nil)) SetSerializer(serializer) } diff --git a/serialization.go b/serialization.go index 71b5d4e..72f6863 100644 --- a/serialization.go +++ b/serialization.go @@ -1,4 +1,5 @@ package goes + import ( "reflect" "encoding/json" @@ -11,6 +12,7 @@ type Serializer interface { Deserialize([]byte) (interface{}, error) } +//TODO: any serializer will require a type registry maybe this should be abstracted type JsonSerializer struct { types map[string]reflect.Type } diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..de0a60a --- /dev/null +++ b/storage.go @@ -0,0 +1,184 @@ +package goes + +import ( + "os" + "path" + "encoding/binary" + "github.com/satori/go.uuid" + "fmt" + "errors" +) + +const IntegerSizeInBytes = 8 +const StreamStartingCapacity = 512 + +type StoredEvent struct { + StreamId uuid.UUID + Data []byte +} + +//TODO: performance - change reads array for some kind of iterator +type Storage interface { + Write(streamId uuid.UUID, data []byte) error + ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) + ReadAll() ([]*StoredEvent, error) +} + +func NewDiskStorage(storagePath string) Storage { + return &DiskStorage{storagePath, path.Join(storagePath, "eventindex")} +} + +type DiskStorage struct { + storagePath string + indexPath string +} + +func (me DiskStorage) getFilename(stream, extension string) string { + return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension) +} + +func (me DiskStorage) getFilenameForEvents(stream string) string { + return me.getFilename(stream, ".history") +} + +func (me DiskStorage) Write(streamId uuid.UUID, data []byte) error { + filename := me.getFilenameForEvents(streamId.String()) + os.MkdirAll(path.Dir(filename), os.ModeDir) + + indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) + if err != nil { + return err + } + defer indexFile.Close() + + eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) + if err != nil { + return err + } + defer eventsFile.Close() + + stat, err := eventsFile.Stat() + if err != nil { + return err + } + position := stat.Size() + + lengthBytes := make([]byte, IntegerSizeInBytes) + binary.BigEndian.PutUint64(lengthBytes, uint64(len(data))) + eventsFile.Write(lengthBytes) + eventsFile.Write(data) + + indexFile.Write(streamId.Bytes()) + positionBytes := make([]byte, IntegerSizeInBytes) + binary.BigEndian.PutUint64(positionBytes, uint64(position)) + indexFile.Write(positionBytes) + + return nil +} + +func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { + streamName := streamId.String() + offset := int64(0) //TODO snapshots + filename := me.getFilenameForEvents(streamName) + + eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer eventsFile.Close() + + eventsFile.Seek(offset, 0) + + contentLengthBytes := make([]byte, IntegerSizeInBytes) + results := make([]*StoredEvent, 0) + for { + read, err := eventsFile.Read(contentLengthBytes) + if err != nil { + break + } + if read != IntegerSizeInBytes { + return nil, errors.New("event index integrity error") + } + data, err := getStoredData(eventsFile, contentLengthBytes) + if err != nil { + return nil, err + } + results = append(results, &StoredEvent{streamId, data}) + } + return results, nil +} + +func (me DiskStorage) ReadAll() ([]*StoredEvent, error) { + indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer indexFile.Close() + + results := make([]*StoredEvent, 0) + guidBytes := make([]byte, 16) + offsetBytes := make([]byte, IntegerSizeInBytes) + for { + read, err := indexFile.Read(guidBytes) + if err != nil { + break + } + if read != 16 { + return nil, errors.New("index integrity error") + } + read, err = indexFile.Read(offsetBytes) + if err != nil { + return nil, err + } + if read != IntegerSizeInBytes { + return nil, errors.New("index integrity error") + } + aggregateId, err := uuid.FromBytes(guidBytes) + if err != nil { + return nil, err + } + offset := binary.BigEndian.Uint64(offsetBytes) + + data, err := me.retrieveData(aggregateId, int64(offset)) + if err != nil { + return nil, err + } + results = append(results, &StoredEvent{aggregateId, data}) + } + + return results, nil +} + +func (me DiskStorage) retrieveData(aggregateId uuid.UUID, offset int64) ([]byte, error) { + filename := me.getFilenameForEvents(aggregateId.String()) + + eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer eventsFile.Close() + + eventsFile.Seek(offset, 0) + contentLengthBytes := make([]byte, IntegerSizeInBytes) + read, err := eventsFile.Read(contentLengthBytes) + if err != nil { + return nil, err + } + if read < IntegerSizeInBytes { + return nil, errors.New("event integrity problem") + } + return getStoredData(eventsFile, contentLengthBytes) +} + +func getStoredData(eventsFile *os.File, contentLengthBytes []byte) ([]byte, error) { + contentLength := binary.BigEndian.Uint64(contentLengthBytes) + data := make([]byte, contentLength) + read, err := eventsFile.Read(data) + if err != nil { + return nil, err + } + if uint64(read) < contentLength { + return nil, errors.New("incomplete event information retrieved") + } + return data, nil +} \ No newline at end of file