From 5549c9a1350b0cfb308fd32dcb9bdf9c97a9792c Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Fri, 26 Aug 2016 22:10:15 -0700 Subject: [PATCH] Implement v2 Protocol. Adding expectedVersion for optimistic concurrency and metadata. --- LICENSE | 10 ++ README.md | 16 +- actions/actions.go | 230 ++++++++++++++---------- data/event.go | 7 +- goes.go | 167 +++++------------- goes_test.go | 363 +++++++++++++++++++------------------- serializer/json.go | 9 +- serializer/passthru.go | 8 + server/zeromq.go | 175 ++++++++++++++++++ storage/dailydisk.go | 58 +++++- storage/dailydisk_test.go | 13 +- storage/simpledisk.go | 11 +- storage/storage.go | 3 + 13 files changed, 648 insertions(+), 422 deletions(-) create mode 100644 LICENSE create mode 100644 server/zeromq.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..861af5a --- /dev/null +++ b/LICENSE @@ -0,0 +1,10 @@ +Copyright (c) 2016 GoES Contributors + +Contributors can be found at https://github.com/adymitruk/goes/graphs/contributors + + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index e5b3d3c..a0d7371 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,12 @@ +# Goes + GoLang implementation of a simple EventStore -# Getting started +Released under the MIT license. See [LICENSE](https://github.com/adymitruk/goes/blob/master/LICENSE) file. -## Pre-requisites +## Getting started + +### Pre-requisites - Install [GoLang](https://golang.org/doc/install) version 1.6+ - Install [libsodium](https://download.libsodium.org/libsodium/releases/)\* version 1.0.10+ (Linux only^) @@ -14,9 +18,9 @@ GoLang implementation of a simple EventStore You can look at [scripts/bootstrap.sh](https://github.com/adymitruk/goes/blob/master/scripts/bootstrap.sh) to get an idea on how to install all the pre-requisites. -## Build +### Build -### Fetching GO packages +#### Fetching GO packages In your GOPATH folder, execute the following commands: @@ -24,13 +28,13 @@ In your GOPATH folder, execute the following commands: `go get github.com/pebbe/zmq4` `go get github.com/satori/go.uuid` -### Compiling the binary +#### Compiling the binary In the project root folder, execute the following command: `go build` -## Running the server +### Running the server In the project root folder, execute the following command: diff --git a/actions/actions.go b/actions/actions.go index 4ccc46c..b798ced 100644 --- a/actions/actions.go +++ b/actions/actions.go @@ -1,91 +1,139 @@ -package actions - -import ( - "time" - "github.com/satori/go.uuid" - storage "../storage" - serializer "../serializer" - data "../data" -) - -var mapLock chan int = make(chan int, 1) -var streamsLock map[string]chan int = make(map[string]chan int) - -type ActionsHandler struct { - storage storage.Storage - serializer serializer.Serializer -} - -func NewActionsHandler(storage storage.Storage, serializer serializer.Serializer) *ActionsHandler { - return &ActionsHandler{storage, serializer} -} - -func lockStream(streamName string) { - mapLock <- 1 - defer func(){ - <-mapLock - }() - - streamLock := streamsLock[streamName] - if streamLock == nil { - streamLock = make(chan int, 1) - streamsLock[streamName] = streamLock - } - - streamLock <- 1 -} - -func unlockStream(streamName string) { - <-streamsLock[streamName] -} - -func (me ActionsHandler) AddEvent(event data.Event) error { - streamName := event.AggregateId.String() - - lockStream(streamName) - defer unlockStream(streamName) - - serializedPayload, typeId, err := me.serializer.Serialize(event.Payload) - if err != nil { - return err - } - - return me.storage.Write(&storage.StoredEvent{StreamId: event.AggregateId, CreationTime: time.Now(), TypeId: typeId, Data: serializedPayload}) -} - -func (me ActionsHandler) RetrieveFor(aggregateId uuid.UUID) ([]*data.Event, error) { - results, err := me.storage.ReadStream(aggregateId) - if err != nil { - return nil, err - } - - events := make([]*data.Event, 0) - for _, storedEvent := range results { - event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) - if err != nil { - return nil, err - } - events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event}) - } - - return events, nil -} - -func (me ActionsHandler) RetrieveAll() ([]*data.Event, error) { - results, err := me.storage.ReadAll() - if err != nil { - return nil, err - } - - events := make([]*data.Event, 0) - for _, storedEvent := range results { - event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) - if err != nil { - return nil, err - } - events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event}) - } - - return events, nil -} - +package actions + +import ( + data "../data" + serializer "../serializer" + storage "../storage" + "errors" + "fmt" + "github.com/satori/go.uuid" + "time" +) + +const NO_EXPECTEDVERSION = uint32(0xFFFFFFFF) + +var mapLock chan int = make(chan int, 1) +var streamsLock map[string]chan int = make(map[string]chan int) + +type Handler interface { + AddEvent(data.Event, uint32) error + RetrieveFor(uuid.UUID) ([]*data.Event, error) + RetrieveAll() ([]*data.Event, error) +} + +type ActionsHandler struct { + storage storage.Storage + serializer serializer.Serializer +} + +func NewActionsHandler(storage storage.Storage, serializer serializer.Serializer) *ActionsHandler { + return &ActionsHandler{storage, serializer} +} + +func lockStream(streamName string) { + mapLock <- 1 + defer func() { + <-mapLock + }() + + streamLock := streamsLock[streamName] + if streamLock == nil { + streamLock = make(chan int, 1) + streamsLock[streamName] = streamLock + } + + streamLock <- 1 +} + +func unlockStream(streamName string) { + <-streamsLock[streamName] +} + +func (me ActionsHandler) AddEvent(event data.Event, expectedVersion uint32) error { + streamName := event.AggregateId.String() + + lockStream(streamName) + defer unlockStream(streamName) + + serializedPayload, typeId, err := me.serializer.Serialize(event.Payload) + if err != nil { + return err + } + + serializedMetadata, _, err := me.serializer.Serialize(event.Metadata) + if err != nil { + return err + } + + if expectedVersion != NO_EXPECTEDVERSION { + ver, err := me.storage.StreamVersion(event.AggregateId) + if err != nil && err.Error()[0:9] != "NOT_FOUND" { + return err + } + if ver != expectedVersion { + return errors.New(fmt.Sprint("WrongExpectedVersion: expected ", expectedVersion, " got ", ver)) + } + } + + return me.storage.Write(&storage.StoredEvent{ + StreamId: event.AggregateId, + CreationTime: time.Now(), + TypeId: typeId, + Data: serializedPayload, + Metadata: serializedMetadata}) +} + +func (me ActionsHandler) RetrieveFor(aggregateId uuid.UUID) ([]*data.Event, error) { + results, err := me.storage.ReadStream(aggregateId) + if err != nil && err.Error()[0:9] == "NOT_FOUND" { + return make([]*data.Event, 0), nil + } + if err != nil { + return nil, err + } + + events := make([]*data.Event, 0) + for _, storedEvent := range results { + event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) + if err != nil { + return nil, err + } + metadata, err := me.serializer.Deserialize(storedEvent.Metadata, storedEvent.MetadataTypeId) + if err != nil { + return nil, err + } + events = append(events, &data.Event{ + AggregateId: storedEvent.StreamId, + CreationTime: storedEvent.CreationTime, + Payload: event, + Metadata: metadata}) + } + + return events, nil +} + +func (me ActionsHandler) RetrieveAll() ([]*data.Event, error) { + results, err := me.storage.ReadAll() + if err != nil { + return nil, err + } + + events := make([]*data.Event, 0) + for _, storedEvent := range results { + event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) + if err != nil { + return nil, err + } + metadata, err := me.serializer.Deserialize(storedEvent.Metadata, storedEvent.MetadataTypeId) + if err != nil { + return nil, err + } + events = append(events, &data.Event{ + AggregateId: storedEvent.StreamId, + CreationTime: storedEvent.CreationTime, + Payload: event, + Metadata: metadata}) + } + + return events, nil +} diff --git a/data/event.go b/data/event.go index d81283b..5e325fe 100644 --- a/data/event.go +++ b/data/event.go @@ -3,11 +3,14 @@ package data import ( "github.com/satori/go.uuid" "reflect" + "time" ) type Event struct { - AggregateId uuid.UUID - Payload interface{} + AggregateId uuid.UUID + CreationTime time.Time + Payload interface{} + Metadata interface{} } func (me *Event) Equals(other *Event) bool { diff --git a/goes.go b/goes.go index 24d39fd..3d0b02c 100644 --- a/goes.go +++ b/goes.go @@ -1,123 +1,44 @@ -package main - -import ( - "fmt" - actions "./actions" - storage "./storage" - serializer "./serializer" - data "./data" - "github.com/satori/go.uuid" - "github.com/pebbe/zmq4" - "flag" - "os" - "path" -) - -var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to") -var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage") - -func PathIsAbsolute(s string) bool { - if len(s) > 1 && s[1] == ':' { - return true - } - return path.IsAbs(s) -} - -func main() { - fmt.Println("Simple ZeroMQ server for goes.") - - flag.Parse() - - storagePath := *db - if !PathIsAbsolute(storagePath) { - wd, _ := os.Getwd() - storagePath = path.Join(wd, storagePath) - } - - fmt.Println("Listening on:", *addr) - fmt.Println("Storage path:", storagePath) - - var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer()) - - context, err := zmq4.NewContext() - if err != nil { - panic(err) - } - defer context.Term() - - replySocket, err := context.NewSocket(zmq4.REP) - if err != nil { - panic(err) - } - defer replySocket.Close() - - err = replySocket.Bind(*addr) - if err != nil { - panic(err) - } - - for { - message, err := replySocket.RecvMessageBytes(0) - if err != nil { - fmt.Println("Error receiving command from client", err) - continue - } - - command := string(message[0]) - switch command { - case "AddEvent": - aggregateId, err := uuid.FromBytes(message[1]) - if err != nil { - fmt.Println("Wrong format for AggregateId", err) - break - } - fmt.Println("->", command, aggregateId.String()) - payload := message[2] - err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload}) - if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), 0) - fmt.Println(err) - break - } - replySocket.Send("Ok", 0) - case "ReadStream": - aggregateId, err := uuid.FromBytes(message[1]) - if err != nil { - fmt.Println("Wrong format for AggregateId", err) - break - } - fmt.Println("->", command, aggregateId.String()) - events, err := handler.RetrieveFor(aggregateId) - if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), 0) - fmt.Println(err) - break - } - sendEvents(replySocket, events) - case "ReadAll": - fmt.Println("->", command) - events, err := handler.RetrieveAll() - if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), 0) - fmt.Println(err) - break - } - sendEvents(replySocket, events) - case "Shutdown": - fmt.Println("->", command) - return - } - } -} - -func sendEvents(socket *zmq4.Socket, events []*data.Event) { - len := len(events) - socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE) - - i := 0 - for ; i < len-1; i++ { - socket.SendBytes(events[i].Payload.([]byte), zmq4.SNDMORE) - } - socket.SendBytes(events[i].Payload.([]byte), 0) - fmt.Println("<-", len, "events") -} +package main + +import ( + actions "./actions" + serializer "./serializer" + server "./server" + storage "./storage" + "flag" + "fmt" + "os" + "path" +) + +var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to") +var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage") + +func PathIsAbsolute(s string) bool { + if len(s) > 1 && s[1] == ':' { + return true + } + return path.IsAbs(s) +} + +func main() { + fmt.Println("GoES - Go Event Store") + fmt.Println("Released under the MIT license. See LICENSE file.") + fmt.Println() + + flag.Parse() + + storagePath := *db + if !PathIsAbsolute(storagePath) { + wd, _ := os.Getwd() + storagePath = path.Join(wd, storagePath) + } + + fmt.Println("Listening on:", *addr) + fmt.Println("Storage path:", storagePath) + + var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer()) + server.Bind(*addr) + server.Listen(handler) + server.Destroy() +} diff --git a/goes_test.go b/goes_test.go index 50f1a73..53a204d 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1,182 +1,181 @@ -package main - -import -( - "testing" - "github.com/satori/go.uuid" - "os" - "path" - "io/ioutil" - "bytes" - storage "./storage" - serializer "./serializer" - data "./data" - actions "./actions" -) - -var tempDir string -var handler *actions.ActionsHandler -var _storage storage.Storage -var _serializer serializer.Serializer - -type AnEvent struct { - A int64 - B string -} - -type AnotherEvent struct { - W int64 - T string - F float64 -} - -func setUp() { - tempDir := path.Join(os.TempDir(), uuid.NewV4().String()) - _storage = storage.NewSimpleDiskStorage(tempDir) - _serializer = serializer.NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil)) - handler = actions.NewActionsHandler(_storage, _serializer) -} - -func tearDown() { - err := os.RemoveAll(tempDir) - if err != nil { - panic(err) - } -} - -func wrapEvent(aggregateId uuid.UUID, event interface{}) data.Event { - return data.Event{AggregateId: aggregateId, Payload: event} -} - -func TestSerializeEventToJson(t *testing.T) { - setUp() - defer tearDown() - - ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) - err := handler.AddEvent(ev) - if err != nil { - t.Errorf("AddEvent failed with %q", err) - return - } - - filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()); - if fi, _ := os.Stat(filename); fi == nil { - t.Errorf("AddEvent failed to create file %q", filename) - return - } - content, _ := ioutil.ReadFile(filename) - if !bytes.Contains(content, []byte("{\"A\":1024,\"B\":\"Tests\"}")) { - t.Errorf("AddEvent failed. File doesn't contain event json.") - return - } -} - -func TestSerializeEventsForSameAggregateInSameFile(t *testing.T) { - setUp() - defer tearDown() - - aggregateId := uuid.NewV4() - ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) - err := handler.AddEvent(ev1) - if err != nil { - t.Errorf("AddEvent failed with %q", err) - return - } - ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) - err = handler.AddEvent(ev2) - if err != nil { - t.Errorf("AddEvent failed with %q", err) - return - } - - filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(aggregateId.String()) - content, _ := ioutil.ReadFile(filename) - if !bytes.Contains(content, []byte("Hello")) || !bytes.Contains(content, []byte("Bob")) { - t.Error("AddEvent failed. Both events are not serialized in same file.") - return - } -} - -func TestTypeInformationIsProvided(t *testing.T) { - setUp() - defer tearDown() - - ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) - err := handler.AddEvent(ev) - if err != nil { - t.Errorf("AddEvent failed with %q", err) - return - } - - filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()); - if fi, _ := os.Stat(filename); fi == nil { - t.Errorf("AddEvent failed to create file %q", filename) - return - } - content, _ := ioutil.ReadFile(filename) - if !bytes.Contains(content, []byte("AnEvent")) { - t.Errorf("AddEvent failed. File doesn't contain event type.") - return - } -} - -func TestEventsCanBeRetrieved(t *testing.T) { - setUp() - defer tearDown() - - aggregateId := uuid.NewV4() - ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) - err := handler.AddEvent(ev1) - if err != nil { - t.Errorf("AddEvent failed with %q", err) - return - } - ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) - err = handler.AddEvent(ev2) - if err != nil { - t.Errorf("AddEvent failed with %q", err) - return - } - - events, err := handler.RetrieveFor(aggregateId) - switch { - case err != nil: - t.Errorf("RetrieveFor(%q) failed with %q", aggregateId.String(), err) - case len(events) != 2: - t.Errorf("RetrieveFor(%q) returned %v events, expected %v", aggregateId.String(), len(events), 2) - case !ev1.Equals(events[0]): - t.Errorf("RetrieveFor(%q) first event doesn't match %+v != %+v", aggregateId.String(), events[0], ev1) - case !ev2.Equals(events[1]): - t.Errorf("RetrieveFor(%q) second event doesn't match %+v != %+v", aggregateId.String(), events[1], ev2) - } -} - -func TestEventsCanBeReplayedInOrder(t *testing.T) { - setUp() - defer tearDown() - - aggregateId1 := uuid.NewV4() - aggregateId2 := uuid.NewV4() - testEvent1 := wrapEvent(aggregateId1, AnEvent{int64(123), "Hello 1"}) - testEvent2 := wrapEvent(aggregateId2, AnEvent{int64(456), "Hello 2"}) - testEvent3 := wrapEvent(aggregateId1, AnEvent{int64(789), "Hello 3"}) - handler.AddEvent(testEvent1) - handler.AddEvent(testEvent2) - handler.AddEvent(testEvent3) - - events, err := handler.RetrieveAll() - switch { - case err != nil: - t.Errorf("RetrieveAll failed with %q", err) - case len(events) != 3: - t.Errorf("RetrieveAll returned %v events, expected %v", len(events), 3) - case !testEvent1.Equals(events[0]) || !testEvent2.Equals(events[1]) || !testEvent3.Equals(events[2]): - t.Error("RetrieveAll returned events in wrong order.") - } -} - -/* - Missing tests from https://gist.github.com/adymitruk/b4627b74617a37b6d949 - - GUID reversal for distribution - - Created date stored with event - */ \ No newline at end of file +package main + +import ( + actions "./actions" + data "./data" + serializer "./serializer" + storage "./storage" + "bytes" + "github.com/satori/go.uuid" + "io/ioutil" + "os" + "path" + "testing" +) + +var tempDir string +var handler *actions.ActionsHandler +var _storage storage.Storage +var _serializer serializer.Serializer + +type AnEvent struct { + A int64 + B string +} + +type AnotherEvent struct { + W int64 + T string + F float64 +} + +func setUp() { + tempDir = path.Join(os.TempDir(), uuid.NewV4().String()) + _storage = storage.NewSimpleDiskStorage(tempDir) + _serializer = serializer.NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil)) + handler = actions.NewActionsHandler(_storage, _serializer) +} + +func tearDown() { + err := os.RemoveAll(tempDir) + if err != nil { + panic(err) + } +} + +func wrapEvent(aggregateId uuid.UUID, event interface{}) data.Event { + return data.Event{AggregateId: aggregateId, Payload: event, Metadata: nil} +} + +func TestSerializeEventToJson(t *testing.T) { + setUp() + defer tearDown() + + ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) + err := handler.AddEvent(ev, actions.NO_EXPECTEDVERSION) + if err != nil { + t.Errorf("AddEvent failed with %q", err) + return + } + + filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()) + if fi, _ := os.Stat(filename); fi == nil { + t.Errorf("AddEvent failed to create file %q", filename) + return + } + content, _ := ioutil.ReadFile(filename) + if !bytes.Contains(content, []byte("{\"A\":1024,\"B\":\"Tests\"}")) { + t.Errorf("AddEvent failed. File doesn't contain event json.") + return + } +} + +func TestSerializeEventsForSameAggregateInSameFile(t *testing.T) { + setUp() + defer tearDown() + + aggregateId := uuid.NewV4() + ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) + err := handler.AddEvent(ev1, actions.NO_EXPECTEDVERSION) + if err != nil { + t.Errorf("AddEvent failed with %q", err) + return + } + ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) + err = handler.AddEvent(ev2, actions.NO_EXPECTEDVERSION) + if err != nil { + t.Errorf("AddEvent failed with %q", err) + return + } + + filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(aggregateId.String()) + content, _ := ioutil.ReadFile(filename) + if !bytes.Contains(content, []byte("Hello")) || !bytes.Contains(content, []byte("Bob")) { + t.Error("AddEvent failed. Both events are not serialized in same file.") + return + } +} + +func TestTypeInformationIsProvided(t *testing.T) { + setUp() + defer tearDown() + + ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) + err := handler.AddEvent(ev, actions.NO_EXPECTEDVERSION) + if err != nil { + t.Errorf("AddEvent failed with %q", err) + return + } + + filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()) + if fi, _ := os.Stat(filename); fi == nil { + t.Errorf("AddEvent failed to create file %q", filename) + return + } + content, _ := ioutil.ReadFile(filename) + if !bytes.Contains(content, []byte("AnEvent")) { + t.Errorf("AddEvent failed. File doesn't contain event type.") + return + } +} + +func TestEventsCanBeRetrieved(t *testing.T) { + setUp() + defer tearDown() + + aggregateId := uuid.NewV4() + ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) + err := handler.AddEvent(ev1, actions.NO_EXPECTEDVERSION) + if err != nil { + t.Errorf("AddEvent failed with %q", err) + return + } + ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) + err = handler.AddEvent(ev2, actions.NO_EXPECTEDVERSION) + if err != nil { + t.Errorf("AddEvent failed with %q", err) + return + } + + events, err := handler.RetrieveFor(aggregateId) + switch { + case err != nil: + t.Errorf("RetrieveFor(%q) failed with %q. %q", aggregateId.String(), err, tempDir) + case len(events) != 2: + t.Errorf("RetrieveFor(%q) returned %v events, expected %v", aggregateId.String(), len(events), 2) + case !ev1.Equals(events[0]): + t.Errorf("RetrieveFor(%q) first event doesn't match %+v != %+v", aggregateId.String(), events[0], ev1) + case !ev2.Equals(events[1]): + t.Errorf("RetrieveFor(%q) second event doesn't match %+v != %+v", aggregateId.String(), events[1], ev2) + } +} + +func TestEventsCanBeReplayedInOrder(t *testing.T) { + setUp() + defer tearDown() + + aggregateId1 := uuid.NewV4() + aggregateId2 := uuid.NewV4() + testEvent1 := wrapEvent(aggregateId1, AnEvent{int64(123), "Hello 1"}) + testEvent2 := wrapEvent(aggregateId2, AnEvent{int64(456), "Hello 2"}) + testEvent3 := wrapEvent(aggregateId1, AnEvent{int64(789), "Hello 3"}) + handler.AddEvent(testEvent1, actions.NO_EXPECTEDVERSION) + handler.AddEvent(testEvent2, actions.NO_EXPECTEDVERSION) + handler.AddEvent(testEvent3, actions.NO_EXPECTEDVERSION) + + events, err := handler.RetrieveAll() + switch { + case err != nil: + t.Errorf("RetrieveAll failed with %q %q", err, tempDir) + case len(events) != 3: + t.Errorf("RetrieveAll returned %v events, expected %v", len(events), 3) + case !testEvent1.Equals(events[0]) || !testEvent2.Equals(events[1]) || !testEvent3.Equals(events[2]): + t.Error("RetrieveAll returned events in wrong order.") + } +} + +/* + Missing tests from https://gist.github.com/adymitruk/b4627b74617a37b6d949 + - GUID reversal for distribution + - Created date stored with event +*/ diff --git a/serializer/json.go b/serializer/json.go index d915680..937e52f 100644 --- a/serializer/json.go +++ b/serializer/json.go @@ -4,6 +4,7 @@ import ( "reflect" "encoding/json" "errors" + "fmt" ) type JsonSerializer struct { @@ -27,6 +28,9 @@ func (me *JsonSerializer) RegisterType(t interface{}) { } func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, string, error) { + if obj == nil { + return []byte(""), "", nil + } type_ := reflect.TypeOf(obj) if (type_.Kind() == reflect.Interface || type_.Kind() == reflect.Ptr) { return nil, "", errors.New("Trying to serialize a Ptr type.") @@ -40,9 +44,12 @@ func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, string, error) { } func (me *JsonSerializer) Deserialize(serialized []byte, typeId string) (interface{}, error) { + if (typeId == "") { + return nil, nil + } type_ := me.types[typeId] if type_ == nil { - return nil, errors.New("type not registered in serializer") + return nil, errors.New(fmt.Sprintf("type %q not registered in serializer", typeId)) } objPtr := reflect.New(type_).Interface() err := json.Unmarshal(serialized, objPtr) diff --git a/serializer/passthru.go b/serializer/passthru.go index 06b5337..42cd700 100644 --- a/serializer/passthru.go +++ b/serializer/passthru.go @@ -12,6 +12,10 @@ func NewPassthruSerializer() *PassthruSerializer { } func (me PassthruSerializer) Serialize(input interface{}) (output []byte, typeId string, err error) { + if input == nil { + return nil, "", nil + } + content, ok := input.([]byte) if !ok { err = errors.New("input should be []byte") @@ -30,6 +34,10 @@ func (me PassthruSerializer) Serialize(input interface{}) (output []byte, typeId } func (me PassthruSerializer) Deserialize(input []byte, typeId string) (interface{}, error) { + if (typeId == "") { + return nil, nil + } + output := []byte(typeId) output = append(output, ' ') output = append(output, input...) diff --git a/server/zeromq.go b/server/zeromq.go new file mode 100644 index 0000000..dae5783 --- /dev/null +++ b/server/zeromq.go @@ -0,0 +1,175 @@ +package server + +import ( + "fmt" + data "../data" + actions "../actions" + "github.com/satori/go.uuid" + "github.com/pebbe/zmq4" + "encoding/binary" +) + +var context *zmq4.Context +var replySocket *zmq4.Socket + +func Bind(addr string) { + var err error; + + context, err = zmq4.NewContext() + if err != nil { + panic(err) + } + + replySocket, err = context.NewSocket(zmq4.REP) + if err != nil { + panic(err) + } + + err = replySocket.Bind(addr) + if err != nil { + panic(err) + } +} + +func Destroy() { + replySocket.Close() + context.Term() +} + +const NO_FLAGS = zmq4.Flag(0) +const UUID_SIZE = 16 +const COMMAND_FRAME = 0 +const ARGS_FRAME = 1 +const PAYLOAD_FRAME = 2 +const METADATA_FRAME = 3 + +func Listen(handler actions.Handler) { + for { + message, err := replySocket.RecvMessageBytes(NO_FLAGS) + if err != nil { + fmt.Println("Error receiving command from client", err) + continue + } + + command := string(message[COMMAND_FRAME]) + switch command { + case "AddEvent": + // v1 - "AddEvent" [AggregateId] {payload} + aggregateId, err := uuid.FromBytes(message[ARGS_FRAME]) + if err != nil { + fmt.Println("Wrong format for AggregateId", err) + break + } + fmt.Println("->", command, aggregateId.String()) + payload := message[PAYLOAD_FRAME] + err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: nil}, actions.NO_EXPECTEDVERSION) + if err != nil { + replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + fmt.Println(err) + break + } + replySocket.Send("Ok", NO_FLAGS) + case "AddEvent_v2": + // v2 - "AddEvent" 16:AggregateId,4:expectedVersion {payload} {metadata} + aggregateId, err := uuid.FromBytes(message[ARGS_FRAME][0:UUID_SIZE]) + if err != nil { + fmt.Println("Wrong format for AggregateId", err) + break + } + expectedVersion := binary.LittleEndian.Uint32(message[ARGS_FRAME][UUID_SIZE:]) + fmt.Println("->", command, aggregateId.String(), expectedVersion) + payload := message[PAYLOAD_FRAME] + metadata := message[METADATA_FRAME] + err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: metadata}, expectedVersion) + if err != nil { + replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + fmt.Println(err) + break + } + replySocket.Send("Ok", NO_FLAGS) + case "ReadStream", "ReadStream_v2": + aggregateId, err := uuid.FromBytes(message[ARGS_FRAME]) + if err != nil { + fmt.Println("Wrong format for AggregateId", err) + break + } + fmt.Println("->", command, aggregateId.String()) + events, err := handler.RetrieveFor(aggregateId) + if err != nil { + replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + fmt.Println(err) + break + } + if command == "ReadStream_v2" { + sendEvents_v2(replySocket, events) + break; + } + sendEvents_v1(replySocket, events) + case "ReadAll", "ReadAll_v2": + fmt.Println("->", command) + events, err := handler.RetrieveAll() + if err != nil { + replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + fmt.Println(err) + break + } + if command == "ReadAll_v2" { + sendEvents_v2(replySocket, events) + break; + } + sendEvents_v1(replySocket, events) + case "Shutdown": + fmt.Println("->", command) + return + } + } +} + +func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) { + lastFlag := zmq4.SNDMORE + if (isLast) { + lastFlag = NO_FLAGS + } + socket.SendBytes(event.Payload.([]byte), lastFlag) +} + +func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) { + len := len(events) + if (len == 0) { + socket.Send("0", NO_FLAGS) + return + } + + socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE) + + i := 0 + for ; i < len; i++ { + sendEvent_v1(socket, events[i], i == len - 1) + } + fmt.Println("<-", len, "events") +} + +func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) { + socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE) + lastFlag := zmq4.SNDMORE + if (isLast) { + lastFlag = NO_FLAGS + } + socket.SendBytes(event.Metadata.([]byte), lastFlag) +} + +func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) { + len := len(events) + if (len == 0) { + socket.Send("0", NO_FLAGS) + return + } + + socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE) + + i := 0 + for ; i < len; i++ { + sendEvent_v2(socket, events[i], i == len - 1) + } + fmt.Println("<-", len, "events") +} \ No newline at end of file diff --git a/storage/dailydisk.go b/storage/dailydisk.go index 1e429e3..11da7e2 100644 --- a/storage/dailydisk.go +++ b/storage/dailydisk.go @@ -8,8 +8,12 @@ import ( "fmt" "errors" "io/ioutil" + "bytes" ) +const EMPTY_STREAM = uint32(0) +var CRLF = []byte {'\r', '\n'} + type DailyDiskStorage struct { storagePath string indexesPath string @@ -94,7 +98,7 @@ func readIndexNextEntry(f *os.File) (*IndexEntry, error) { return &index, nil; } -func writeEvent(filename string, data []byte) error { +func writeEvent(filename string, data []byte, metadata []byte) error { eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) if err != nil { return err @@ -102,12 +106,26 @@ func writeEvent(filename string, data []byte) error { defer eventFile.Close() eventFile.Write(data) + eventFile.Write(CRLF) + eventFile.Write(metadata) return nil } -func readEvent(filename string) ([]byte, error) { - return ioutil.ReadFile(filename) +func readEvent(filename string) (data []byte, metadata []byte, err error) { + content, err := ioutil.ReadFile(filename) + if err != nil { + return + } + sep := bytes.Index(content, CRLF) + if sep == -1 { + data = content + metadata = make([]byte, 0) + return + } + data = content[:sep] + metadata = content[sep+2:] + return } func (me DailyDiskStorage) Write(event *StoredEvent) error { @@ -115,7 +133,7 @@ func (me DailyDiskStorage) Write(event *StoredEvent) error { eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) os.MkdirAll(path.Dir(eventFilename), 0777) - err := writeEvent(eventFilename, event.Data) + err := writeEvent(eventFilename, event.Data, event.Metadata) if err != nil { return err } @@ -135,11 +153,33 @@ func (me DailyDiskStorage) Write(event *StoredEvent) error { return nil } +func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { + indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) + if err != nil { + return EMPTY_STREAM, errors.New("NOT_FOUND: " + err.Error()) + } + defer indexFile.Close() + + ver := EMPTY_STREAM + for { + _, err := readIndexNextEntry(indexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + return EMPTY_STREAM, err + } + ver++ + } + + return ver, nil +} + func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) if err != nil { - return nil, err + return nil, errors.New("NOT_FOUND: " + err.Error()) } defer indexFile.Close() @@ -152,11 +192,11 @@ func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error if err != nil { return nil, err } - data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) + data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) if err != nil { return nil, err } - event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data} + event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} events = append(events, event) } @@ -179,11 +219,11 @@ func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) { if err != nil { return nil, err } - data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) + data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) if err != nil { return nil, err } - event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data} + event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata} events = append(events, event) } diff --git a/storage/dailydisk_test.go b/storage/dailydisk_test.go index 63f224e..bc7f4eb 100644 --- a/storage/dailydisk_test.go +++ b/storage/dailydisk_test.go @@ -20,9 +20,10 @@ func TestAddEvent(t *testing.T) { aggregateId := uuid.NewV4() aType := "myType" data := []byte("{}") + metadata := []byte("{}") //Act - err := storage.Write(&StoredEvent{aggregateId, aTime, aType, data}) + err := storage.Write(&StoredEvent{aggregateId, aTime, aType, data, "Metadata", metadata}) //Assert if err != nil { @@ -54,9 +55,9 @@ func TestReadStream(t *testing.T) { storage := NewDailyDiskStorage(storagePath) streamId := uuid.NewV4() - ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent")} + ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent"), "Metadata", []byte("{}")} storage.Write(ev1) - ev2 := &StoredEvent{streamId, time.Now(), "2ndType", []byte("2ndEvent")} + ev2 := &StoredEvent{streamId, time.Now(), "2ndType", []byte("2ndEvent"), "Metadata", []byte("{}")} storage.Write(ev2) //Act @@ -89,11 +90,11 @@ func TestReadAll(t *testing.T) { stream1Id := uuid.NewV4() stream2Id := uuid.NewV4() - ev1 := &StoredEvent{stream1Id, time.Now(), "1stType", []byte("1stEvent")} + ev1 := &StoredEvent{stream1Id, time.Now(), "1stType", []byte("1stEvent"), "Metadata", []byte("{}")} storage.Write(ev1) - ev2 := &StoredEvent{stream2Id, time.Now(), "2ndType", []byte("2ndEvent")} + ev2 := &StoredEvent{stream2Id, time.Now(), "2ndType", []byte("2ndEvent"), "Metadata", []byte("{}")} storage.Write(ev2) - ev3 := &StoredEvent{stream1Id, time.Now(), "3rdType", []byte("3rdEvent")} + ev3 := &StoredEvent{stream1Id, time.Now(), "3rdType", []byte("3rdEvent"), "Metadata", []byte("{}")} storage.Write(ev3) //Act diff --git a/storage/simpledisk.go b/storage/simpledisk.go index 5e29e8e..fac5297 100644 --- a/storage/simpledisk.go +++ b/storage/simpledisk.go @@ -111,6 +111,11 @@ func (me SimpleDiskStorage) Write(event *StoredEvent) error { return nil } +func (me SimpleDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { + //TODO + return EMPTY_STREAM, nil +} + func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { streamName := streamId.String() offset := int64(0) //TODO snapshots @@ -134,7 +139,8 @@ func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, erro return nil, err } - event := &StoredEvent{streamId, creationTime, typeId, data} + //TODO metadata + event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} results = append(results, event) } return results, nil @@ -197,7 +203,8 @@ func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64 return nil, err } - event := &StoredEvent{streamId, creationTime, typeId, data} + //TODO metadata + event := &StoredEvent{streamId, creationTime, typeId, data, "", nil} return event, nil } diff --git a/storage/storage.go b/storage/storage.go index ee9bc52..bdf0850 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -13,6 +13,8 @@ type StoredEvent struct { CreationTime time.Time TypeId string Data []byte + MetadataTypeId string + Metadata []byte } //TODO: performance - change reads array for some kind of iterator @@ -20,4 +22,5 @@ type Storage interface { Write(event *StoredEvent) error ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) ReadAll() ([]*StoredEvent, error) + StreamVersion(streamId uuid.UUID) (uint32, error) }