Implement v2 Protocol. Adding expectedVersion for optimistic concurrency and metadata.

This commit is contained in:
Nicolas Dextraze 2016-08-26 22:10:15 -07:00
parent d819fb7de6
commit 5549c9a135
13 changed files with 648 additions and 422 deletions

10
LICENSE Normal file
View File

@ -0,0 +1,10 @@
Copyright (c) 2016 GoES Contributors
Contributors can be found at https://github.com/adymitruk/goes/graphs/contributors
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,8 +1,12 @@
# Goes
GoLang implementation of a simple EventStore GoLang implementation of a simple EventStore
# Getting started Released under the MIT license. See [LICENSE](https://github.com/adymitruk/goes/blob/master/LICENSE) file.
## Pre-requisites ## Getting started
### Pre-requisites
- Install [GoLang](https://golang.org/doc/install) version 1.6+ - Install [GoLang](https://golang.org/doc/install) version 1.6+
- Install [libsodium](https://download.libsodium.org/libsodium/releases/)\* version 1.0.10+ (Linux only^) - Install [libsodium](https://download.libsodium.org/libsodium/releases/)\* version 1.0.10+ (Linux only^)
@ -14,9 +18,9 @@ GoLang implementation of a simple EventStore
You can look at [scripts/bootstrap.sh](https://github.com/adymitruk/goes/blob/master/scripts/bootstrap.sh) to get an idea on how to install all the pre-requisites. You can look at [scripts/bootstrap.sh](https://github.com/adymitruk/goes/blob/master/scripts/bootstrap.sh) to get an idea on how to install all the pre-requisites.
## Build ### Build
### Fetching GO packages #### Fetching GO packages
In your GOPATH folder, execute the following commands: In your GOPATH folder, execute the following commands:
@ -24,13 +28,13 @@ In your GOPATH folder, execute the following commands:
`go get github.com/pebbe/zmq4` `go get github.com/pebbe/zmq4`
`go get github.com/satori/go.uuid` `go get github.com/satori/go.uuid`
### Compiling the binary #### Compiling the binary
In the project root folder, execute the following command: In the project root folder, execute the following command:
`go build` `go build`
## Running the server ### Running the server
In the project root folder, execute the following command: In the project root folder, execute the following command:

View File

@ -1,18 +1,28 @@
package actions package actions
import ( import (
"time"
"github.com/satori/go.uuid"
storage "../storage"
serializer "../serializer"
data "../data" data "../data"
serializer "../serializer"
storage "../storage"
"errors"
"fmt"
"github.com/satori/go.uuid"
"time"
) )
const NO_EXPECTEDVERSION = uint32(0xFFFFFFFF)
var mapLock chan int = make(chan int, 1) var mapLock chan int = make(chan int, 1)
var streamsLock map[string]chan int = make(map[string]chan int) var streamsLock map[string]chan int = make(map[string]chan int)
type Handler interface {
AddEvent(data.Event, uint32) error
RetrieveFor(uuid.UUID) ([]*data.Event, error)
RetrieveAll() ([]*data.Event, error)
}
type ActionsHandler struct { type ActionsHandler struct {
storage storage.Storage storage storage.Storage
serializer serializer.Serializer serializer serializer.Serializer
} }
@ -22,7 +32,7 @@ func NewActionsHandler(storage storage.Storage, serializer serializer.Serializer
func lockStream(streamName string) { func lockStream(streamName string) {
mapLock <- 1 mapLock <- 1
defer func(){ defer func() {
<-mapLock <-mapLock
}() }()
@ -39,7 +49,7 @@ func unlockStream(streamName string) {
<-streamsLock[streamName] <-streamsLock[streamName]
} }
func (me ActionsHandler) AddEvent(event data.Event) error { func (me ActionsHandler) AddEvent(event data.Event, expectedVersion uint32) error {
streamName := event.AggregateId.String() streamName := event.AggregateId.String()
lockStream(streamName) lockStream(streamName)
@ -50,11 +60,34 @@ func (me ActionsHandler) AddEvent(event data.Event) error {
return err return err
} }
return me.storage.Write(&storage.StoredEvent{StreamId: event.AggregateId, CreationTime: time.Now(), TypeId: typeId, Data: serializedPayload}) serializedMetadata, _, err := me.serializer.Serialize(event.Metadata)
if err != nil {
return err
}
if expectedVersion != NO_EXPECTEDVERSION {
ver, err := me.storage.StreamVersion(event.AggregateId)
if err != nil && err.Error()[0:9] != "NOT_FOUND" {
return err
}
if ver != expectedVersion {
return errors.New(fmt.Sprint("WrongExpectedVersion: expected ", expectedVersion, " got ", ver))
}
}
return me.storage.Write(&storage.StoredEvent{
StreamId: event.AggregateId,
CreationTime: time.Now(),
TypeId: typeId,
Data: serializedPayload,
Metadata: serializedMetadata})
} }
func (me ActionsHandler) RetrieveFor(aggregateId uuid.UUID) ([]*data.Event, error) { func (me ActionsHandler) RetrieveFor(aggregateId uuid.UUID) ([]*data.Event, error) {
results, err := me.storage.ReadStream(aggregateId) results, err := me.storage.ReadStream(aggregateId)
if err != nil && err.Error()[0:9] == "NOT_FOUND" {
return make([]*data.Event, 0), nil
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -65,7 +98,15 @@ func (me ActionsHandler) RetrieveFor(aggregateId uuid.UUID) ([]*data.Event, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event}) metadata, err := me.serializer.Deserialize(storedEvent.Metadata, storedEvent.MetadataTypeId)
if err != nil {
return nil, err
}
events = append(events, &data.Event{
AggregateId: storedEvent.StreamId,
CreationTime: storedEvent.CreationTime,
Payload: event,
Metadata: metadata})
} }
return events, nil return events, nil
@ -83,9 +124,16 @@ func (me ActionsHandler) RetrieveAll() ([]*data.Event, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event}) metadata, err := me.serializer.Deserialize(storedEvent.Metadata, storedEvent.MetadataTypeId)
if err != nil {
return nil, err
}
events = append(events, &data.Event{
AggregateId: storedEvent.StreamId,
CreationTime: storedEvent.CreationTime,
Payload: event,
Metadata: metadata})
} }
return events, nil return events, nil
} }

View File

@ -3,11 +3,14 @@ package data
import ( import (
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"reflect" "reflect"
"time"
) )
type Event struct { type Event struct {
AggregateId uuid.UUID AggregateId uuid.UUID
Payload interface{} CreationTime time.Time
Payload interface{}
Metadata interface{}
} }
func (me *Event) Equals(other *Event) bool { func (me *Event) Equals(other *Event) bool {

97
goes.go
View File

@ -1,14 +1,12 @@
package main package main
import ( import (
"fmt"
actions "./actions" actions "./actions"
storage "./storage"
serializer "./serializer" serializer "./serializer"
data "./data" server "./server"
"github.com/satori/go.uuid" storage "./storage"
"github.com/pebbe/zmq4"
"flag" "flag"
"fmt"
"os" "os"
"path" "path"
) )
@ -24,7 +22,9 @@ func PathIsAbsolute(s string) bool {
} }
func main() { func main() {
fmt.Println("Simple ZeroMQ server for goes.") fmt.Println("GoES - Go Event Store")
fmt.Println("Released under the MIT license. See LICENSE file.")
fmt.Println()
flag.Parse() flag.Parse()
@ -38,86 +38,7 @@ func main() {
fmt.Println("Storage path:", storagePath) fmt.Println("Storage path:", storagePath)
var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer()) var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer())
server.Bind(*addr)
context, err := zmq4.NewContext() server.Listen(handler)
if err != nil { server.Destroy()
panic(err)
}
defer context.Term()
replySocket, err := context.NewSocket(zmq4.REP)
if err != nil {
panic(err)
}
defer replySocket.Close()
err = replySocket.Bind(*addr)
if err != nil {
panic(err)
}
for {
message, err := replySocket.RecvMessageBytes(0)
if err != nil {
fmt.Println("Error receiving command from client", err)
continue
}
command := string(message[0])
switch command {
case "AddEvent":
aggregateId, err := uuid.FromBytes(message[1])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
payload := message[2]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload})
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
replySocket.Send("Ok", 0)
case "ReadStream":
aggregateId, err := uuid.FromBytes(message[1])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
events, err := handler.RetrieveFor(aggregateId)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
sendEvents(replySocket, events)
case "ReadAll":
fmt.Println("->", command)
events, err := handler.RetrieveAll()
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
sendEvents(replySocket, events)
case "Shutdown":
fmt.Println("->", command)
return
}
}
}
func sendEvents(socket *zmq4.Socket, events []*data.Event) {
len := len(events)
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
i := 0
for ; i < len-1; i++ {
socket.SendBytes(events[i].Payload.([]byte), zmq4.SNDMORE)
}
socket.SendBytes(events[i].Payload.([]byte), 0)
fmt.Println("<-", len, "events")
} }

View File

@ -1,17 +1,16 @@
package main package main
import import (
( actions "./actions"
"testing" data "./data"
serializer "./serializer"
storage "./storage"
"bytes"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"io/ioutil"
"os" "os"
"path" "path"
"io/ioutil" "testing"
"bytes"
storage "./storage"
serializer "./serializer"
data "./data"
actions "./actions"
) )
var tempDir string var tempDir string
@ -31,7 +30,7 @@ type AnotherEvent struct {
} }
func setUp() { func setUp() {
tempDir := path.Join(os.TempDir(), uuid.NewV4().String()) tempDir = path.Join(os.TempDir(), uuid.NewV4().String())
_storage = storage.NewSimpleDiskStorage(tempDir) _storage = storage.NewSimpleDiskStorage(tempDir)
_serializer = serializer.NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil)) _serializer = serializer.NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil))
handler = actions.NewActionsHandler(_storage, _serializer) handler = actions.NewActionsHandler(_storage, _serializer)
@ -45,7 +44,7 @@ func tearDown() {
} }
func wrapEvent(aggregateId uuid.UUID, event interface{}) data.Event { func wrapEvent(aggregateId uuid.UUID, event interface{}) data.Event {
return data.Event{AggregateId: aggregateId, Payload: event} return data.Event{AggregateId: aggregateId, Payload: event, Metadata: nil}
} }
func TestSerializeEventToJson(t *testing.T) { func TestSerializeEventToJson(t *testing.T) {
@ -53,13 +52,13 @@ func TestSerializeEventToJson(t *testing.T) {
defer tearDown() defer tearDown()
ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"})
err := handler.AddEvent(ev) err := handler.AddEvent(ev, actions.NO_EXPECTEDVERSION)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()); filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String())
if fi, _ := os.Stat(filename); fi == nil { if fi, _ := os.Stat(filename); fi == nil {
t.Errorf("AddEvent failed to create file %q", filename) t.Errorf("AddEvent failed to create file %q", filename)
return return
@ -77,13 +76,13 @@ func TestSerializeEventsForSameAggregateInSameFile(t *testing.T) {
aggregateId := uuid.NewV4() aggregateId := uuid.NewV4()
ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"})
err := handler.AddEvent(ev1) err := handler.AddEvent(ev1, actions.NO_EXPECTEDVERSION)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45})
err = handler.AddEvent(ev2) err = handler.AddEvent(ev2, actions.NO_EXPECTEDVERSION)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
@ -102,13 +101,13 @@ func TestTypeInformationIsProvided(t *testing.T) {
defer tearDown() defer tearDown()
ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"})
err := handler.AddEvent(ev) err := handler.AddEvent(ev, actions.NO_EXPECTEDVERSION)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()); filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String())
if fi, _ := os.Stat(filename); fi == nil { if fi, _ := os.Stat(filename); fi == nil {
t.Errorf("AddEvent failed to create file %q", filename) t.Errorf("AddEvent failed to create file %q", filename)
return return
@ -126,13 +125,13 @@ func TestEventsCanBeRetrieved(t *testing.T) {
aggregateId := uuid.NewV4() aggregateId := uuid.NewV4()
ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"})
err := handler.AddEvent(ev1) err := handler.AddEvent(ev1, actions.NO_EXPECTEDVERSION)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45})
err = handler.AddEvent(ev2) err = handler.AddEvent(ev2, actions.NO_EXPECTEDVERSION)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
@ -141,7 +140,7 @@ func TestEventsCanBeRetrieved(t *testing.T) {
events, err := handler.RetrieveFor(aggregateId) events, err := handler.RetrieveFor(aggregateId)
switch { switch {
case err != nil: case err != nil:
t.Errorf("RetrieveFor(%q) failed with %q", aggregateId.String(), err) t.Errorf("RetrieveFor(%q) failed with %q. %q", aggregateId.String(), err, tempDir)
case len(events) != 2: case len(events) != 2:
t.Errorf("RetrieveFor(%q) returned %v events, expected %v", aggregateId.String(), len(events), 2) t.Errorf("RetrieveFor(%q) returned %v events, expected %v", aggregateId.String(), len(events), 2)
case !ev1.Equals(events[0]): case !ev1.Equals(events[0]):
@ -160,14 +159,14 @@ func TestEventsCanBeReplayedInOrder(t *testing.T) {
testEvent1 := wrapEvent(aggregateId1, AnEvent{int64(123), "Hello 1"}) testEvent1 := wrapEvent(aggregateId1, AnEvent{int64(123), "Hello 1"})
testEvent2 := wrapEvent(aggregateId2, AnEvent{int64(456), "Hello 2"}) testEvent2 := wrapEvent(aggregateId2, AnEvent{int64(456), "Hello 2"})
testEvent3 := wrapEvent(aggregateId1, AnEvent{int64(789), "Hello 3"}) testEvent3 := wrapEvent(aggregateId1, AnEvent{int64(789), "Hello 3"})
handler.AddEvent(testEvent1) handler.AddEvent(testEvent1, actions.NO_EXPECTEDVERSION)
handler.AddEvent(testEvent2) handler.AddEvent(testEvent2, actions.NO_EXPECTEDVERSION)
handler.AddEvent(testEvent3) handler.AddEvent(testEvent3, actions.NO_EXPECTEDVERSION)
events, err := handler.RetrieveAll() events, err := handler.RetrieveAll()
switch { switch {
case err != nil: case err != nil:
t.Errorf("RetrieveAll failed with %q", err) t.Errorf("RetrieveAll failed with %q %q", err, tempDir)
case len(events) != 3: case len(events) != 3:
t.Errorf("RetrieveAll returned %v events, expected %v", len(events), 3) t.Errorf("RetrieveAll returned %v events, expected %v", len(events), 3)
case !testEvent1.Equals(events[0]) || !testEvent2.Equals(events[1]) || !testEvent3.Equals(events[2]): case !testEvent1.Equals(events[0]) || !testEvent2.Equals(events[1]) || !testEvent3.Equals(events[2]):
@ -179,4 +178,4 @@ func TestEventsCanBeReplayedInOrder(t *testing.T) {
Missing tests from https://gist.github.com/adymitruk/b4627b74617a37b6d949 Missing tests from https://gist.github.com/adymitruk/b4627b74617a37b6d949
- GUID reversal for distribution - GUID reversal for distribution
- Created date stored with event - Created date stored with event
*/ */

View File

@ -4,6 +4,7 @@ import (
"reflect" "reflect"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
) )
type JsonSerializer struct { type JsonSerializer struct {
@ -27,6 +28,9 @@ func (me *JsonSerializer) RegisterType(t interface{}) {
} }
func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, string, error) { func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, string, error) {
if obj == nil {
return []byte(""), "", nil
}
type_ := reflect.TypeOf(obj) type_ := reflect.TypeOf(obj)
if (type_.Kind() == reflect.Interface || type_.Kind() == reflect.Ptr) { if (type_.Kind() == reflect.Interface || type_.Kind() == reflect.Ptr) {
return nil, "", errors.New("Trying to serialize a Ptr type.") return nil, "", errors.New("Trying to serialize a Ptr type.")
@ -40,9 +44,12 @@ func (me *JsonSerializer) Serialize(obj interface{}) ([]byte, string, error) {
} }
func (me *JsonSerializer) Deserialize(serialized []byte, typeId string) (interface{}, error) { func (me *JsonSerializer) Deserialize(serialized []byte, typeId string) (interface{}, error) {
if (typeId == "") {
return nil, nil
}
type_ := me.types[typeId] type_ := me.types[typeId]
if type_ == nil { if type_ == nil {
return nil, errors.New("type not registered in serializer") return nil, errors.New(fmt.Sprintf("type %q not registered in serializer", typeId))
} }
objPtr := reflect.New(type_).Interface() objPtr := reflect.New(type_).Interface()
err := json.Unmarshal(serialized, objPtr) err := json.Unmarshal(serialized, objPtr)

View File

@ -12,6 +12,10 @@ func NewPassthruSerializer() *PassthruSerializer {
} }
func (me PassthruSerializer) Serialize(input interface{}) (output []byte, typeId string, err error) { func (me PassthruSerializer) Serialize(input interface{}) (output []byte, typeId string, err error) {
if input == nil {
return nil, "", nil
}
content, ok := input.([]byte) content, ok := input.([]byte)
if !ok { if !ok {
err = errors.New("input should be []byte") err = errors.New("input should be []byte")
@ -30,6 +34,10 @@ func (me PassthruSerializer) Serialize(input interface{}) (output []byte, typeId
} }
func (me PassthruSerializer) Deserialize(input []byte, typeId string) (interface{}, error) { func (me PassthruSerializer) Deserialize(input []byte, typeId string) (interface{}, error) {
if (typeId == "") {
return nil, nil
}
output := []byte(typeId) output := []byte(typeId)
output = append(output, ' ') output = append(output, ' ')
output = append(output, input...) output = append(output, input...)

175
server/zeromq.go Normal file
View File

@ -0,0 +1,175 @@
package server
import (
"fmt"
data "../data"
actions "../actions"
"github.com/satori/go.uuid"
"github.com/pebbe/zmq4"
"encoding/binary"
)
var context *zmq4.Context
var replySocket *zmq4.Socket
func Bind(addr string) {
var err error;
context, err = zmq4.NewContext()
if err != nil {
panic(err)
}
replySocket, err = context.NewSocket(zmq4.REP)
if err != nil {
panic(err)
}
err = replySocket.Bind(addr)
if err != nil {
panic(err)
}
}
func Destroy() {
replySocket.Close()
context.Term()
}
const NO_FLAGS = zmq4.Flag(0)
const UUID_SIZE = 16
const COMMAND_FRAME = 0
const ARGS_FRAME = 1
const PAYLOAD_FRAME = 2
const METADATA_FRAME = 3
func Listen(handler actions.Handler) {
for {
message, err := replySocket.RecvMessageBytes(NO_FLAGS)
if err != nil {
fmt.Println("Error receiving command from client", err)
continue
}
command := string(message[COMMAND_FRAME])
switch command {
case "AddEvent":
// v1 - "AddEvent" [AggregateId] {payload}
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
payload := message[PAYLOAD_FRAME]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: nil}, actions.NO_EXPECTEDVERSION)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
replySocket.Send("Ok", NO_FLAGS)
case "AddEvent_v2":
// v2 - "AddEvent" 16:AggregateId,4:expectedVersion {payload} {metadata}
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME][0:UUID_SIZE])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
expectedVersion := binary.LittleEndian.Uint32(message[ARGS_FRAME][UUID_SIZE:])
fmt.Println("->", command, aggregateId.String(), expectedVersion)
payload := message[PAYLOAD_FRAME]
metadata := message[METADATA_FRAME]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: metadata}, expectedVersion)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
replySocket.Send("Ok", NO_FLAGS)
case "ReadStream", "ReadStream_v2":
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
events, err := handler.RetrieveFor(aggregateId)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
if command == "ReadStream_v2" {
sendEvents_v2(replySocket, events)
break;
}
sendEvents_v1(replySocket, events)
case "ReadAll", "ReadAll_v2":
fmt.Println("->", command)
events, err := handler.RetrieveAll()
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
if command == "ReadAll_v2" {
sendEvents_v2(replySocket, events)
break;
}
sendEvents_v1(replySocket, events)
case "Shutdown":
fmt.Println("->", command)
return
}
}
}
func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) {
lastFlag := zmq4.SNDMORE
if (isLast) {
lastFlag = NO_FLAGS
}
socket.SendBytes(event.Payload.([]byte), lastFlag)
}
func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) {
len := len(events)
if (len == 0) {
socket.Send("0", NO_FLAGS)
return
}
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
i := 0
for ; i < len; i++ {
sendEvent_v1(socket, events[i], i == len - 1)
}
fmt.Println("<-", len, "events")
}
func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) {
socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE)
lastFlag := zmq4.SNDMORE
if (isLast) {
lastFlag = NO_FLAGS
}
socket.SendBytes(event.Metadata.([]byte), lastFlag)
}
func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) {
len := len(events)
if (len == 0) {
socket.Send("0", NO_FLAGS)
return
}
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
i := 0
for ; i < len; i++ {
sendEvent_v2(socket, events[i], i == len - 1)
}
fmt.Println("<-", len, "events")
}

View File

@ -8,8 +8,12 @@ import (
"fmt" "fmt"
"errors" "errors"
"io/ioutil" "io/ioutil"
"bytes"
) )
const EMPTY_STREAM = uint32(0)
var CRLF = []byte {'\r', '\n'}
type DailyDiskStorage struct { type DailyDiskStorage struct {
storagePath string storagePath string
indexesPath string indexesPath string
@ -94,7 +98,7 @@ func readIndexNextEntry(f *os.File) (*IndexEntry, error) {
return &index, nil; return &index, nil;
} }
func writeEvent(filename string, data []byte) error { func writeEvent(filename string, data []byte, metadata []byte) error {
eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) eventFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
if err != nil { if err != nil {
return err return err
@ -102,12 +106,26 @@ func writeEvent(filename string, data []byte) error {
defer eventFile.Close() defer eventFile.Close()
eventFile.Write(data) eventFile.Write(data)
eventFile.Write(CRLF)
eventFile.Write(metadata)
return nil return nil
} }
func readEvent(filename string) ([]byte, error) { func readEvent(filename string) (data []byte, metadata []byte, err error) {
return ioutil.ReadFile(filename) content, err := ioutil.ReadFile(filename)
if err != nil {
return
}
sep := bytes.Index(content, CRLF)
if sep == -1 {
data = content
metadata = make([]byte, 0)
return
}
data = content[:sep]
metadata = content[sep+2:]
return
} }
func (me DailyDiskStorage) Write(event *StoredEvent) error { func (me DailyDiskStorage) Write(event *StoredEvent) error {
@ -115,7 +133,7 @@ func (me DailyDiskStorage) Write(event *StoredEvent) error {
eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) eventFilename := me.getEventFilename(event.CreationTime, event.TypeId)
os.MkdirAll(path.Dir(eventFilename), 0777) os.MkdirAll(path.Dir(eventFilename), 0777)
err := writeEvent(eventFilename, event.Data) err := writeEvent(eventFilename, event.Data, event.Metadata)
if err != nil { if err != nil {
return err return err
} }
@ -135,11 +153,33 @@ func (me DailyDiskStorage) Write(event *StoredEvent) error {
return nil return nil
} }
func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) {
indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0)
if err != nil {
return EMPTY_STREAM, errors.New("NOT_FOUND: " + err.Error())
}
defer indexFile.Close()
ver := EMPTY_STREAM
for {
_, err := readIndexNextEntry(indexFile)
if err != nil && err.Error() == "EOF" {
break
}
if err != nil {
return EMPTY_STREAM, err
}
ver++
}
return ver, nil
}
func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, errors.New("NOT_FOUND: " + err.Error())
} }
defer indexFile.Close() defer indexFile.Close()
@ -152,11 +192,11 @@ func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId))
if err != nil { if err != nil {
return nil, err return nil, err
} }
event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data} event := &StoredEvent{streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata}
events = append(events, event) events = append(events, event)
} }
@ -179,11 +219,11 @@ func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId)) data, metadata, err := readEvent(me.getEventFilename(indexEntry.creationTime, indexEntry.typeId))
if err != nil { if err != nil {
return nil, err return nil, err
} }
event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data} event := &StoredEvent{indexEntry.streamId, indexEntry.creationTime, indexEntry.typeId, data, "Metadata", metadata}
events = append(events, event) events = append(events, event)
} }

View File

@ -20,9 +20,10 @@ func TestAddEvent(t *testing.T) {
aggregateId := uuid.NewV4() aggregateId := uuid.NewV4()
aType := "myType" aType := "myType"
data := []byte("{}") data := []byte("{}")
metadata := []byte("{}")
//Act //Act
err := storage.Write(&StoredEvent{aggregateId, aTime, aType, data}) err := storage.Write(&StoredEvent{aggregateId, aTime, aType, data, "Metadata", metadata})
//Assert //Assert
if err != nil { if err != nil {
@ -54,9 +55,9 @@ func TestReadStream(t *testing.T) {
storage := NewDailyDiskStorage(storagePath) storage := NewDailyDiskStorage(storagePath)
streamId := uuid.NewV4() streamId := uuid.NewV4()
ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent")} ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent"), "Metadata", []byte("{}")}
storage.Write(ev1) storage.Write(ev1)
ev2 := &StoredEvent{streamId, time.Now(), "2ndType", []byte("2ndEvent")} ev2 := &StoredEvent{streamId, time.Now(), "2ndType", []byte("2ndEvent"), "Metadata", []byte("{}")}
storage.Write(ev2) storage.Write(ev2)
//Act //Act
@ -89,11 +90,11 @@ func TestReadAll(t *testing.T) {
stream1Id := uuid.NewV4() stream1Id := uuid.NewV4()
stream2Id := uuid.NewV4() stream2Id := uuid.NewV4()
ev1 := &StoredEvent{stream1Id, time.Now(), "1stType", []byte("1stEvent")} ev1 := &StoredEvent{stream1Id, time.Now(), "1stType", []byte("1stEvent"), "Metadata", []byte("{}")}
storage.Write(ev1) storage.Write(ev1)
ev2 := &StoredEvent{stream2Id, time.Now(), "2ndType", []byte("2ndEvent")} ev2 := &StoredEvent{stream2Id, time.Now(), "2ndType", []byte("2ndEvent"), "Metadata", []byte("{}")}
storage.Write(ev2) storage.Write(ev2)
ev3 := &StoredEvent{stream1Id, time.Now(), "3rdType", []byte("3rdEvent")} ev3 := &StoredEvent{stream1Id, time.Now(), "3rdType", []byte("3rdEvent"), "Metadata", []byte("{}")}
storage.Write(ev3) storage.Write(ev3)
//Act //Act

View File

@ -111,6 +111,11 @@ func (me SimpleDiskStorage) Write(event *StoredEvent) error {
return nil return nil
} }
func (me SimpleDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) {
//TODO
return EMPTY_STREAM, nil
}
func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
streamName := streamId.String() streamName := streamId.String()
offset := int64(0) //TODO snapshots offset := int64(0) //TODO snapshots
@ -134,7 +139,8 @@ func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, erro
return nil, err return nil, err
} }
event := &StoredEvent{streamId, creationTime, typeId, data} //TODO metadata
event := &StoredEvent{streamId, creationTime, typeId, data, "", nil}
results = append(results, event) results = append(results, event)
} }
return results, nil return results, nil
@ -197,7 +203,8 @@ func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64
return nil, err return nil, err
} }
event := &StoredEvent{streamId, creationTime, typeId, data} //TODO metadata
event := &StoredEvent{streamId, creationTime, typeId, data, "", nil}
return event, nil return event, nil
} }

View File

@ -13,6 +13,8 @@ type StoredEvent struct {
CreationTime time.Time CreationTime time.Time
TypeId string TypeId string
Data []byte Data []byte
MetadataTypeId string
Metadata []byte
} }
//TODO: performance - change reads array for some kind of iterator //TODO: performance - change reads array for some kind of iterator
@ -20,4 +22,5 @@ type Storage interface {
Write(event *StoredEvent) error Write(event *StoredEvent) error
ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error)
ReadAll() ([]*StoredEvent, error) ReadAll() ([]*StoredEvent, error)
StreamVersion(streamId uuid.UUID) (uint32, error)
} }