Adding readable disk storage with minimalistic tests

This commit is contained in:
Nicolas Dextraze 2016-02-17 18:15:40 -08:00
parent 1b4dbcca60
commit 81fad38cb0
6 changed files with 452 additions and 74 deletions

11
goes.go
View File

@ -2,6 +2,7 @@ package goes
import ( import (
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"time"
) )
var serializer Serializer var serializer Serializer
@ -48,14 +49,12 @@ func AddEvent(event Event) error {
lockStream(streamName) lockStream(streamName)
defer unlockStream(streamName) defer unlockStream(streamName)
serializedPayload, err := serializer.Serialize(event.Payload) serializedPayload, typeId, err := serializer.Serialize(event.Payload)
if err != nil { if err != nil {
return err return err
} }
storage.Write(event.AggregateId, serializedPayload) return storage.Write(&StoredEvent{event.AggregateId, time.Now(), typeId, serializedPayload})
return nil
} }
func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) { func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) {
@ -66,7 +65,7 @@ func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) {
events := make([]*Event, 0) events := make([]*Event, 0)
for _, storedEvent := range results { for _, storedEvent := range results {
event, err := serializer.Deserialize(storedEvent.Data) event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -84,7 +83,7 @@ func RetrieveAll() ([]*Event, error) {
events := make([]*Event, 0) events := make([]*Event, 0)
for _, storedEvent := range results { for _, storedEvent := range results {
event, err := serializer.Deserialize(storedEvent.Data) event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }

189
readablediskstorage.go Normal file
View File

@ -0,0 +1,189 @@
package goes
import (
"path"
"os"
"time"
"github.com/satori/go.uuid"
"fmt"
"errors"
"io/ioutil"
)
type ReadableDiskStorage struct {
storagePath string
indexesPath string
globalIndexFilename string
}
func NewReadableDiskStorage(storagePath string) Storage {
indexesPath := path.Join(storagePath, "indexes")
globalIndexPath := path.Join(indexesPath, "global")
os.MkdirAll(indexesPath, 0777)
return &ReadableDiskStorage{storagePath, indexesPath, globalIndexPath};
}
func (me ReadableDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string {
return path.Join(me.indexesPath, streamId.String())
}
func (me ReadableDiskStorage) getEventFilename(creationTime time.Time, typeId string) string {
yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month())
day := fmt.Sprintf("%02d", creationTime.Day())
eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId)
return path.Join(me.storagePath, yearMonth, day, eventFilename)
}
type IndexEntry struct {
streamId uuid.UUID
creationTime time.Time
typeId string
}
func appendIndex(filename string, entry *IndexEntry) error {
indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0)
if err != nil {
return err
}
defer indexFile.Close()
written, err := indexFile.Write(entry.streamId.Bytes())
if err != nil {
return err
}
if written != 16 {
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", 16, written))
}
creationTimeBytes, err := entry.creationTime.MarshalBinary()
if err != nil {
return err
}
writeSizeAndBytes(indexFile, creationTimeBytes)
writeSizeAndBytes(indexFile, []byte(entry.typeId))
return nil
}
func readIndexNextEntry(f *os.File) (*IndexEntry, error) {
index := IndexEntry{}
uuidBytes := make([]byte, 16)
read, err := f.Read(uuidBytes)
if err != nil {
return nil, err
}
if read != 16 {
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %v bytes, got only %v bytes.", 16, read))
}
index.streamId = uuid.FromBytesOrNil(uuidBytes)
creationTimeBytes, err := readSizedBytes(f)
if err != nil {
return nil, err
}
if err = index.creationTime.UnmarshalBinary(creationTimeBytes); err != nil {
return nil, err
}
typeIdBytes, err := readSizedBytes(f)
index.typeId = string(typeIdBytes)
return &index, nil;
}
func writeEvent(filename string, data []byte) error {
eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0)
if err != nil {
return err
}
defer eventFile.Close()
eventFile.Write(data)
return nil
}
func readEvent(filename string) ([]byte, error) {
return ioutil.ReadFile(filename)
}
func (me ReadableDiskStorage) Write(event *StoredEvent) error {
eventFilename := me.getEventFilename(event.CreationTime, event.TypeId)
os.MkdirAll(path.Dir(eventFilename), 0777)
err := writeEvent(eventFilename, event.Data)
if err != nil {
return err
}
index := &IndexEntry{event.StreamId, event.CreationTime, event.TypeId}
err = appendIndex(me.globalIndexFilename, index)
if err != nil {
return err
}
err = appendIndex(me.getStreamIndexFilename(event.StreamId), index)
if err != nil {
return err
}
return nil
}
func (me ReadableDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer indexFile.Close()
events := make([]*StoredEvent, 0)
for {
indexEntry, err := readIndexNextEntry(indexFile)
if err != nil && err.Error() == "EOF" {
break
}
if err != nil {
return nil, err
}
data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId))
if err != nil {
return nil, err
}
event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data}
events = append(events, event)
}
return events, nil
}
func (me ReadableDiskStorage) ReadAll() ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer indexFile.Close()
events := make([]*StoredEvent, 0)
for {
indexEntry, err := readIndexNextEntry(indexFile)
if err != nil && err.Error() == "EOF" {
break
}
if err != nil {
return nil, err
}
data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId))
if err != nil {
return nil, err
}
event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data}
events = append(events, event)
}
return events, nil
}

123
readablediskstorage_test.go Normal file
View File

@ -0,0 +1,123 @@
package goes
import (
"testing"
"os"
"path"
"github.com/satori/go.uuid"
"time"
"reflect"
)
func TestAddEvent(t *testing.T) {
//Arrange
storagePath := path.Join(os.TempDir(), uuid.NewV4().String())
defer os.RemoveAll(storagePath)
storage := NewReadableDiskStorage(storagePath)
aLocation, _ := time.LoadLocation("")
aTime := time.Date(2016,2,11,9,53,32,1234567, aLocation)
aggregateId := uuid.NewV4()
aType := "myType"
data := []byte("{}")
//Act
err := storage.Write(&StoredEvent{aggregateId, aTime, aType, data})
//Assert
if err != nil {
t.Errorf("Write failed. Error: %v", err)
}
readableDiskStorage := storage.(*ReadableDiskStorage)
globalIndexFi, _ := os.Stat(readableDiskStorage.globalIndexFilename)
if globalIndexFi == nil {
t.Error("Write failed. Expected global index file, none exists.")
}
aggregateIndexFi, _ := os.Stat(readableDiskStorage.getStreamIndexFilename(aggregateId))
if aggregateIndexFi == nil {
t.Errorf("Write failed. Expected index for aggregate %v, none exists.", aggregateId.String())
}
eventFi, _ := os.Stat(readableDiskStorage.getEventFilename(aTime, aType))
if eventFi == nil {
t.Errorf("Write failed. Expected file for event %v, none exists.", aggregateId.String())
}
//TODO: check indexes/event content
}
func TestReadStream(t *testing.T) {
//Arrange
storagePath := path.Join(os.TempDir(), uuid.NewV4().String())
defer os.RemoveAll(storagePath)
storage := NewReadableDiskStorage(storagePath)
streamId := uuid.NewV4()
ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent")}
storage.Write(ev1)
ev2 := &StoredEvent{streamId, time.Now(), "2ndType", []byte("2ndEvent")}
storage.Write(ev2)
//Act
storedEvents, err := storage.ReadStream(streamId)
//Assert
if err != nil {
t.Errorf("ReadStream failed. Error: %v", err)
return
}
if len(storedEvents) != 2 {
t.Errorf("ReadStream failed. Got %v stored events, expected %v", len(storedEvents), 2)
return
}
if !reflect.DeepEqual(storedEvents[0], ev1) {
t.Errorf("ReadStream failed. First event doesn't match. %+v != %+v", storedEvents[0], ev1)
return
}
if !reflect.DeepEqual(storedEvents[1], ev2) {
t.Errorf("ReadStream failed. Second event doesn't match. %+v != %+v", storedEvents[1], ev2)
return
}
}
func TestReadAll(t *testing.T) {
//Arrange
storagePath := path.Join(os.TempDir(), uuid.NewV4().String())
defer os.RemoveAll(storagePath)
storage := NewReadableDiskStorage(storagePath)
stream1Id := uuid.NewV4()
stream2Id := uuid.NewV4()
ev1 := &StoredEvent{stream1Id, time.Now(), "1stType", []byte("1stEvent")}
storage.Write(ev1)
ev2 := &StoredEvent{stream2Id, time.Now(), "2ndType", []byte("2ndEvent")}
storage.Write(ev2)
ev3 := &StoredEvent{stream1Id, time.Now(), "3rdType", []byte("3rdEvent")}
storage.Write(ev3)
//Act
storedEvents, err := storage.ReadAll()
//Assert
if err != nil {
t.Errorf("ReadAll failed. Error: %v", err)
return
}
if len(storedEvents) != 3 {
t.Errorf("ReadAll failed. Got %v stored events, expected %v", len(storedEvents), 3)
return
}
if !reflect.DeepEqual(storedEvents[0], ev1) {
t.Errorf("ReadAll failed. First event doesn't match. %+v != %+v", storedEvents[0], ev1)
return
}
if !reflect.DeepEqual(storedEvents[1], ev2) {
t.Errorf("ReadAll failed. Second event doesn't match. %+v != %+v", storedEvents[1], ev2)
return
}
if !reflect.DeepEqual(storedEvents[2], ev3) {
t.Errorf("ReadAll failed. Third event doesn't match. %+v != %+v", storedEvents[2], ev2)
return
}
}

View File

@ -4,12 +4,11 @@ import (
"reflect" "reflect"
"encoding/json" "encoding/json"
"errors" "errors"
"bytes"
) )
type Serializer interface { type Serializer interface {
Serialize(interface{}) ([]byte, error) Serialize(interface{}) ([]byte, string, error)
Deserialize([]byte) (interface{}, error) Deserialize([]byte, string) (interface{}, error)
} }
//TODO: any serializer will require a type registry maybe this should be abstracted //TODO: any serializer will require a type registry maybe this should be abstracted
@ -33,52 +32,29 @@ func (me *JsonSerializer) RegisterType(t interface{}) {
me.types[type_.String()] = type_ me.types[type_.String()] = type_
} }
func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, error) { func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, string, error) {
type_ := reflect.TypeOf(obj) type_ := reflect.TypeOf(obj)
if (type_.Kind() == reflect.Interface || type_.Kind() == reflect.Ptr) { if (type_.Kind() == reflect.Interface || type_.Kind() == reflect.Ptr) {
return nil, errors.New("Trying to serialize a Ptr type.") return nil, "", errors.New("Trying to serialize a Ptr type.")
} }
typeId := type_.String() typeId := type_.String()
data, err := json.Marshal(obj) data, err := json.Marshal(obj)
if err != nil { if err != nil {
return nil, err return nil, "", err
} }
return []byte(typeId + " " + string(data)), nil return data, typeId, nil
} }
func (me *JsonSerializer) Deserialize(serialized []byte) (interface{}, error) { func (me *JsonSerializer) Deserialize(serialized []byte, typeId string) (interface{}, error) {
separatorIndex := bytes.Index(serialized, []byte{' '})
if separatorIndex < 0 {
return nil, errors.New("invalid serialized data")
}
typeId := string(serialized[0:separatorIndex])
type_ := me.types[typeId] type_ := me.types[typeId]
if type_ == nil { if type_ == nil {
return nil, errors.New("type not registered in serializer") return nil, errors.New("type not registered in serializer")
} }
objPtr := reflect.New(type_).Interface() objPtr := reflect.New(type_).Interface()
err := json.Unmarshal(serialized[separatorIndex:], objPtr) err := json.Unmarshal(serialized, objPtr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
obj := reflect.Indirect(reflect.ValueOf(objPtr)).Interface() obj := reflect.Indirect(reflect.ValueOf(objPtr)).Interface()
return obj, nil return obj, nil
} }
type PassthruSerializer struct {}
func NewPassthruSerializer() Serializer {
return &PassthruSerializer{}
}
func (me PassthruSerializer) Serialize(obj interface{}) ([]byte, error) {
serialized, ok := obj.([]byte)
if !ok {
return nil, errors.New("Object is not a slice of bytes")
}
return serialized, nil
}
func (me PassthruSerializer) Deserialize(serialized []byte) (interface{}, error) {
return serialized, nil
}

View File

@ -7,8 +7,43 @@ import (
"bitbucket.org/nicdex/adaptech-goes" "bitbucket.org/nicdex/adaptech-goes"
"os" "os"
"path" "path"
"bytes"
"errors"
) )
type Serializer struct {
}
func NewSerializer() goes.Serializer {
return &Serializer{}
}
func (me Serializer) Serialize(input interface{}) (output []byte, typeId string, err error) {
content, ok := input.([]byte)
if !ok {
err = errors.New("input should be []byte")
return
}
sep := bytes.IndexByte(content, ' ')
if sep == -1 {
err = errors.New("missing split char.")
return
}
output = content[sep+1:]
typeId = string(content[0:sep])
return
}
func (me Serializer) Deserialize(input []byte, typeId string) (interface{}, error) {
output := []byte(typeId)
output = append(output, ' ')
output = append(output, input...)
return output, nil
}
func main() { func main() {
fmt.Println("Simple ZeroMQ server for goes.") fmt.Println("Simple ZeroMQ server for goes.")
@ -16,8 +51,8 @@ func main() {
storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) storagePath := path.Join(os.TempDir(), uuid.NewV4().String())
storagePath = "c:\\dev\\go\\events" storagePath = "c:\\dev\\go\\events"
goes.SetStorage(goes.NewDiskStorage(storagePath)) goes.SetStorage(goes.NewReadableDiskStorage(storagePath))
goes.SetSerializer(goes.NewPassthruSerializer()) goes.SetSerializer(NewSerializer())
context, err := zmq4.NewContext() context, err := zmq4.NewContext()
if err != nil { if err != nil {

View File

@ -7,6 +7,7 @@ import (
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"fmt" "fmt"
"errors" "errors"
"time"
) )
const IntegerSizeInBytes = 8 const IntegerSizeInBytes = 8
@ -14,12 +15,14 @@ const StreamStartingCapacity = 512
type StoredEvent struct { type StoredEvent struct {
StreamId uuid.UUID StreamId uuid.UUID
CreationTime time.Time
TypeId string
Data []byte Data []byte
} }
//TODO: performance - change reads array for some kind of iterator //TODO: performance - change reads array for some kind of iterator
type Storage interface { type Storage interface {
Write(streamId uuid.UUID, data []byte) error Write(event *StoredEvent) error
ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error)
ReadAll() ([]*StoredEvent, error) ReadAll() ([]*StoredEvent, error)
} }
@ -41,8 +44,54 @@ func (me DiskStorage) getFilenameForEvents(stream string) string {
return me.getFilename(stream, ".history") return me.getFilename(stream, ".history")
} }
func (me DiskStorage) Write(streamId uuid.UUID, data []byte) error { func writeSizeAndBytes(f *os.File, data []byte) (error) {
filename := me.getFilenameForEvents(streamId.String()) sizeBytes := make([]byte, IntegerSizeInBytes)
size := len(data)
binary.BigEndian.PutUint64(sizeBytes, uint64(size))
written, err := f.Write(sizeBytes)
if err != nil {
return err
}
if written != IntegerSizeInBytes {
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", IntegerSizeInBytes, written))
}
written, err = f.Write(data)
if err != nil {
return err
}
if written != size {
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", size, written))
}
return nil
}
func readSizedBytes(f *os.File) ([]byte, error) {
sizeBytes := make([]byte, IntegerSizeInBytes)
read, err := f.Read(sizeBytes)
if err != nil {
return nil, err
}
if read != IntegerSizeInBytes {
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %d bytes, got %d bytes.", IntegerSizeInBytes, read))
}
size := binary.BigEndian.Uint64(sizeBytes)
data := make([]byte, size)
read, err = f.Read(data)
if err != nil {
return nil, err
}
if uint64(read) != size {
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to ready %d bytes, got %d bytes.", IntegerSizeInBytes, read))
}
return data, nil
}
func (me DiskStorage) Write(event *StoredEvent) error {
filename := me.getFilenameForEvents(event.StreamId.String())
os.MkdirAll(path.Dir(filename), os.ModeDir) os.MkdirAll(path.Dir(filename), os.ModeDir)
indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0)
@ -63,12 +112,15 @@ func (me DiskStorage) Write(streamId uuid.UUID, data []byte) error {
} }
position := stat.Size() position := stat.Size()
lengthBytes := make([]byte, IntegerSizeInBytes) creationTimeBytes, err := event.CreationTime.MarshalBinary()
binary.BigEndian.PutUint64(lengthBytes, uint64(len(data))) if err != nil {
eventsFile.Write(lengthBytes) return err
eventsFile.Write(data) }
writeSizeAndBytes(eventsFile, creationTimeBytes)
writeSizeAndBytes(eventsFile, []byte(event.TypeId))
writeSizeAndBytes(eventsFile, event.Data)
indexFile.Write(streamId.Bytes()) indexFile.Write(event.StreamId.Bytes())
positionBytes := make([]byte, IntegerSizeInBytes) positionBytes := make([]byte, IntegerSizeInBytes)
binary.BigEndian.PutUint64(positionBytes, uint64(position)) binary.BigEndian.PutUint64(positionBytes, uint64(position))
indexFile.Write(positionBytes) indexFile.Write(positionBytes)
@ -89,21 +141,18 @@ func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
eventsFile.Seek(offset, 0) eventsFile.Seek(offset, 0)
contentLengthBytes := make([]byte, IntegerSizeInBytes)
results := make([]*StoredEvent, 0) results := make([]*StoredEvent, 0)
for { for {
read, err := eventsFile.Read(contentLengthBytes) creationTime, typeId, data, err := getStoredData(eventsFile)
if err != nil { if err != nil && err.Error() == "EOF" {
break break
} }
if read != IntegerSizeInBytes {
return nil, errors.New("event index integrity error")
}
data, err := getStoredData(eventsFile, contentLengthBytes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
results = append(results, &StoredEvent{streamId, data})
event := &StoredEvent{streamId, creationTime, typeId, data}
results = append(results, event)
} }
return results, nil return results, nil
} }
@ -139,18 +188,18 @@ func (me DiskStorage) ReadAll() ([]*StoredEvent, error) {
} }
offset := binary.BigEndian.Uint64(offsetBytes) offset := binary.BigEndian.Uint64(offsetBytes)
data, err := me.retrieveData(aggregateId, int64(offset)) storedEvent, err := me.retrieveStoredEvent(aggregateId, int64(offset))
if err != nil { if err != nil {
return nil, err return nil, err
} }
results = append(results, &StoredEvent{aggregateId, data}) results = append(results, storedEvent)
} }
return results, nil return results, nil
} }
func (me DiskStorage) retrieveData(aggregateId uuid.UUID, offset int64) ([]byte, error) { func (me DiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) {
filename := me.getFilenameForEvents(aggregateId.String()) filename := me.getFilenameForEvents(streamId.String())
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
@ -159,26 +208,33 @@ func (me DiskStorage) retrieveData(aggregateId uuid.UUID, offset int64) ([]byte,
defer eventsFile.Close() defer eventsFile.Close()
eventsFile.Seek(offset, 0) eventsFile.Seek(offset, 0)
contentLengthBytes := make([]byte, IntegerSizeInBytes)
read, err := eventsFile.Read(contentLengthBytes) creationTime, typeId, data, err := getStoredData(eventsFile)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if read < IntegerSizeInBytes {
return nil, errors.New("event integrity problem") event := &StoredEvent{streamId, creationTime, typeId, data}
} return event, nil
return getStoredData(eventsFile, contentLengthBytes)
} }
func getStoredData(eventsFile *os.File, contentLengthBytes []byte) ([]byte, error) { func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data []byte, err error) {
contentLength := binary.BigEndian.Uint64(contentLengthBytes) creationTimeBytes, err := readSizedBytes(eventsFile)
data := make([]byte, contentLength)
read, err := eventsFile.Read(data)
if err != nil { if err != nil {
return nil, err return
} }
if uint64(read) < contentLength { err = creationTime.UnmarshalBinary(creationTimeBytes)
return nil, errors.New("incomplete event information retrieved") if err != nil {
return
} }
return data, nil
typeIdBytes, err := readSizedBytes(eventsFile)
if err != nil {
return
}
typeId = string(typeIdBytes)
data, err = readSizedBytes(eventsFile)
return
} }