From 8f9b33e9ce7d77c3196ab9354848d4becbcab2e3 Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Fri, 12 Feb 2016 17:54:50 -0800 Subject: [PATCH] First commit --- goes.go | 221 +++++++++++++++++++++++++++++++++++++++++++++++ goes.iml | 10 +++ goes_test.go | 127 +++++++++++++++++++++++++++ serialization.go | 64 ++++++++++++++ 4 files changed, 422 insertions(+) create mode 100644 goes.go create mode 100644 goes.iml create mode 100644 goes_test.go create mode 100644 serialization.go diff --git a/goes.go b/goes.go new file mode 100644 index 0000000..e0e0a85 --- /dev/null +++ b/goes.go @@ -0,0 +1,221 @@ +package goes + +import ( + "github.com/satori/go.uuid" + "os" + "fmt" + "encoding/binary" + "path" + "errors" +) + +var storagePath string +var serializer Serializer + +const IntegerSizeInBytes = 8 +const StreamStartingCapacity = 512 + +type Event struct { + AggregateId uuid.UUID + Payload interface{} +} + +func SetStoragePath(newStoragePath string) { + storagePath = newStoragePath +} + +func SetSerializer(newSerializer Serializer) { + serializer = newSerializer +} + +func getFilename(stream, extension string) string { + return fmt.Sprintf("%v%v", path.Join(storagePath, stream[0:2], stream[2:]), extension) +} + +func getFilenameForEvents(stream string) string { + return getFilename(stream, ".history") +} + +var mapLock chan int = make(chan int, 1) +var streamsLock map[string]chan int = make(map[string]chan int) + +func lockStream(streamName string) { + mapLock <- 1 + defer func(){ + <-mapLock + }() + + streamLock := streamsLock[streamName] + if streamLock == nil { + streamLock = make(chan int, 1) + streamsLock[streamName] = streamLock + } + + streamLock <- 1 +} + +func unlockStream(streamName string) { + <-streamsLock[streamName] +} + +func AddEvent(event Event) error { + streamName := event.AggregateId.String() + + lockStream(streamName) + defer unlockStream(streamName) + + filename := getFilenameForEvents(streamName) + os.MkdirAll(path.Dir(filename), os.ModeDir) + + eventIndexPath := path.Join(storagePath, "eventindex") + indexFile, err := os.OpenFile(eventIndexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) + if err != nil { + return err + } + defer indexFile.Close() + + eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0) + if err != nil { + return err + } + defer eventsFile.Close() + + stat, err := eventsFile.Stat() + if err != nil { + return err + } + position := stat.Size() + + serializedPayload, err := serializer.Serialize(event.Payload) + if err != nil { + return err + } + + lengthBytes := make([]byte, IntegerSizeInBytes) + binary.BigEndian.PutUint64(lengthBytes, uint64(len(serializedPayload))) + eventsFile.Write(lengthBytes) + eventsFile.Write(serializedPayload) + + indexFile.Write(event.AggregateId.Bytes()) + positionBytes := make([]byte, IntegerSizeInBytes) + binary.BigEndian.PutUint64(positionBytes, uint64(position)) + indexFile.Write(positionBytes) + + return nil +} + +func RetrieveFor(aggregateId uuid.UUID) ([]Event, error) { + streamName := aggregateId.String() + offset := getStartingIndexFor(streamName) + filename := getFilenameForEvents(streamName) + + eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer eventsFile.Close() + + eventsFile.Seek(offset, 0) + + contentLengthBytes := make([]byte, IntegerSizeInBytes) + events := make([]Event, 0) + for { + read, err := eventsFile.Read(contentLengthBytes) + if err != nil { + break + } + if read < 8 { + return nil, errors.New("event index integrity error") + } + event, err := getStoredEvent(eventsFile, contentLengthBytes) + if err != nil { + return nil, err + } + events = append(events, Event{aggregateId, event}) + } + return events, nil +} + +func getStartingIndexFor(streamName string) int64 { + //TODO: snapshots + return int64(0) +} + +func getStoredEvent(eventsFile *os.File, contentLengthBytes []byte) (interface{}, error) { + contentLength := binary.BigEndian.Uint64(contentLengthBytes) + content := make([]byte, contentLength) + read, err := eventsFile.Read(content) + if err != nil { + return nil, err + } + if uint64(read) < contentLength { + return nil, errors.New("incomplete event information retrieved") + } + return serializer.Deserialize(content) +} + +func retrieveEvent(aggregateId uuid.UUID, offset int64) (*Event, error) { + filename := getFilenameForEvents(aggregateId.String()) + + eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer eventsFile.Close() + + eventsFile.Seek(offset, 0) + contentLengthBytes := make([]byte, IntegerSizeInBytes) + read, err := eventsFile.Read(contentLengthBytes) + if err != nil { + return nil, err + } + if read < IntegerSizeInBytes { + return nil, errors.New("event integrity problem") + } + content, err := getStoredEvent(eventsFile, contentLengthBytes) + if err != nil { + return nil, err + } + return &Event{aggregateId, content}, nil +} + +func RetrieveAll() ([]*Event, error) { + indexFile, err := os.OpenFile(path.Join(storagePath, "eventindex"), os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer indexFile.Close() + + events := make([]*Event, 0) + guidBytes := make([]byte, 16) + offsetBytes := make([]byte, IntegerSizeInBytes) + for { + read, err := indexFile.Read(guidBytes) + if err != nil { + break + } + if read != 16 { + return nil, errors.New("index integrity error") + } + read, err = indexFile.Read(offsetBytes) + if err != nil { + return nil, err + } + if read != IntegerSizeInBytes { + return nil, errors.New("index integrity error") + } + aggregateId, err := uuid.FromBytes(guidBytes) + if err != nil { + return nil, err + } + offset := binary.BigEndian.Uint64(offsetBytes) + + event, err := retrieveEvent(aggregateId, int64(offset)) + if err != nil { + return nil, err + } + events = append(events, event) + } + + return events, nil +} \ No newline at end of file diff --git a/goes.iml b/goes.iml new file mode 100644 index 0000000..b453abb --- /dev/null +++ b/goes.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/goes_test.go b/goes_test.go new file mode 100644 index 0000000..9bdff87 --- /dev/null +++ b/goes_test.go @@ -0,0 +1,127 @@ +package goes + +import ( + "testing" + "github.com/satori/go.uuid" + "crypto/rand" + "os" + _ "path" + "reflect" + "fmt" +"math/big" + "path" +) + +var tempDir string + +type MyEvent struct { + A int64 + B string +} + +type AnotherEvent struct { + W int64 + T string + F float64 +} + +func setUp() { + tempDir := path.Join(os.TempDir(), uuid.NewV4().String()) + SetStoragePath(tempDir) + serializer := NewJsonSerializer((*MyEvent)(nil), (*AnotherEvent)(nil)) + SetSerializer(serializer) +} + +func tearDown() { + err := os.RemoveAll(tempDir) + if err != nil { + panic(err) + } +} + +func createRandomEvent() *Event { + id := uuid.NewV4() + return createRandomEventFor(id) +} + +func createRandomEventFor(id uuid.UUID) *Event { + a, _ := rand.Int(rand.Reader, big.NewInt(100000)) + b, _ := rand.Int(rand.Reader, big.NewInt(1000000)) + payload := MyEvent{a.Int64(), fmt.Sprintf("abc-%v", b.Int64())} + return &Event{id, payload} +} + +func TestAddEvent(t *testing.T) { + setUp() + defer tearDown() + + ev := createRandomEvent() + err := AddEvent(*ev) + if err != nil { + t.Errorf("AddEvent failed with %q", err) + } +} + +func TestAddEventsToSameAggregate(t *testing.T) { + setUp() + defer tearDown() + + id := uuid.NewV4() + ev1 := createRandomEventFor(id) + err := AddEvent(*ev1) + if err != nil { + t.Errorf("AddEvent() failed with %q", err) + return + } + ev2 := createRandomEventFor(id) + err = AddEvent(*ev2) + if err != nil { + t.Errorf("AddEvent() failed with %q", err) + return + } +} + +func (me *Event) Equals(other *Event) bool { + return me.AggregateId == other.AggregateId && reflect.DeepEqual(me.Payload, other.Payload) +} + +func TestRetrieveFor(t *testing.T) { + setUp() + defer tearDown() + + id := uuid.NewV4() + ev1 := createRandomEventFor(id) + ev2 := createRandomEventFor(id) + AddEvent(*ev1) + AddEvent(*ev2) + AddEvent(*createRandomEvent()) + + events, err := RetrieveFor(id) + switch { + case err != nil: + t.Errorf("RetrieveFor(%q) failed with %q", id.String(), err) + case len(events) != 2: + t.Errorf("RetrieveFor(%q) returned %v events, expected %v", id.String(), len(events), 2) + case !events[0].Equals(ev1): + t.Errorf("RetrieveFor(%q) first event doesn't match %+v != %+v", id.String(), events[0], ev1) + case !events[1].Equals(ev2): + t.Errorf("RetrieveFor(%q) second event doesn't match %+v != %+v", id.String(), events[1], ev2) + } +} + +func TestRetrieveAll(t *testing.T) { + setUp() + defer tearDown() + + AddEvent(*createRandomEvent()) + AddEvent(*createRandomEvent()) + AddEvent(*createRandomEvent()) + + events, err := RetrieveAll() + switch { + case err != nil: + t.Errorf("RetrieveAll() failed with %q", err) + case len(events) != 3: + t.Errorf("RetrieveAll() returned %v events, expected %v", len(events), 3) + } +} \ No newline at end of file diff --git a/serialization.go b/serialization.go new file mode 100644 index 0000000..71b5d4e --- /dev/null +++ b/serialization.go @@ -0,0 +1,64 @@ +package goes +import ( + "reflect" + "encoding/json" + "errors" + "bytes" +) + +type Serializer interface { + Serialize(interface{}) ([]byte, error) + Deserialize([]byte) (interface{}, error) +} + +type JsonSerializer struct { + types map[string]reflect.Type +} + +func NewJsonSerializer(types ...interface{}) *JsonSerializer { + s := &JsonSerializer{make(map[string]reflect.Type)} + for _, t := range types { + s.RegisterType(t) + } + return s +} + +func (me *JsonSerializer) RegisterType(t interface{}) { + type_ := reflect.TypeOf(t) + if type_.Kind() == reflect.Ptr || type_.Kind() == reflect.Interface { + type_ = type_.Elem() + } + me.types[type_.String()] = type_ +} + +func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, error) { + type_ := reflect.TypeOf(obj) + if (type_.Kind() == reflect.Interface || type_.Kind() == reflect.Ptr) { + return nil, errors.New("Trying to serialize a Ptr type.") + } + typeId := type_.String() + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + return []byte(typeId + " " + string(data)), nil +} + +func (me *JsonSerializer) Deserialize(serialized []byte) (interface{}, error) { + separatorIndex := bytes.Index(serialized, []byte{' '}) + if separatorIndex < 0 { + return nil, errors.New("invalid serialized data") + } + typeId := string(serialized[0:separatorIndex]) + type_ := me.types[typeId] + if type_ == nil { + return nil, errors.New("type not registered in serializer") + } + objPtr := reflect.New(type_).Interface() + err := json.Unmarshal(serialized[separatorIndex:], objPtr) + if err != nil { + return nil, err + } + obj := reflect.Indirect(reflect.ValueOf(objPtr)).Interface() + return obj, nil +}