Refactoring: re-organize project intro multiple packages (folders)

This commit is contained in:
Nicolas Dextraze 2016-08-26 13:32:04 -07:00
parent b3280c7759
commit d819fb7de6
14 changed files with 335 additions and 294 deletions

2
.gitignore vendored
View File

@ -1 +1,3 @@
.idea/ .idea/
goes
goes.exe

View File

@ -26,16 +26,14 @@ In your GOPATH folder, execute the following commands:
### Compiling the binary ### Compiling the binary
In your GOPATH folder, execute the following command: In the project root folder, execute the following command:
`go build -o bin/goes src/github.com/adymitruk/goes/simpleserver/simpleserver.go` `go build`
\* Use `-o bin/goes.exe` on Windows
## Running the server ## Running the server
In your GOPATH folder, execute the following command: In the project root folder, execute the following command:
`./bin/goes --db=./events --addr=tcp://127.0.0.1:12345` `./goes --db=./events --addr=tcp://127.0.0.1:12345`
Both flags are optional and their default values are the same as the example. Both flags are optional and their default values are the same as the example.

91
actions/actions.go Normal file
View File

@ -0,0 +1,91 @@
package actions
import (
"time"
"github.com/satori/go.uuid"
storage "../storage"
serializer "../serializer"
data "../data"
)
var mapLock chan int = make(chan int, 1)
var streamsLock map[string]chan int = make(map[string]chan int)
type ActionsHandler struct {
storage storage.Storage
serializer serializer.Serializer
}
func NewActionsHandler(storage storage.Storage, serializer serializer.Serializer) *ActionsHandler {
return &ActionsHandler{storage, serializer}
}
func lockStream(streamName string) {
mapLock <- 1
defer func(){
<-mapLock
}()
streamLock := streamsLock[streamName]
if streamLock == nil {
streamLock = make(chan int, 1)
streamsLock[streamName] = streamLock
}
streamLock <- 1
}
func unlockStream(streamName string) {
<-streamsLock[streamName]
}
func (me ActionsHandler) AddEvent(event data.Event) error {
streamName := event.AggregateId.String()
lockStream(streamName)
defer unlockStream(streamName)
serializedPayload, typeId, err := me.serializer.Serialize(event.Payload)
if err != nil {
return err
}
return me.storage.Write(&storage.StoredEvent{StreamId: event.AggregateId, CreationTime: time.Now(), TypeId: typeId, Data: serializedPayload})
}
func (me ActionsHandler) RetrieveFor(aggregateId uuid.UUID) ([]*data.Event, error) {
results, err := me.storage.ReadStream(aggregateId)
if err != nil {
return nil, err
}
events := make([]*data.Event, 0)
for _, storedEvent := range results {
event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId)
if err != nil {
return nil, err
}
events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event})
}
return events, nil
}
func (me ActionsHandler) RetrieveAll() ([]*data.Event, error) {
results, err := me.storage.ReadAll()
if err != nil {
return nil, err
}
events := make([]*data.Event, 0)
for _, storedEvent := range results {
event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId)
if err != nil {
return nil, err
}
events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event})
}
return events, nil
}

15
data/event.go Normal file
View File

@ -0,0 +1,15 @@
package data
import (
"github.com/satori/go.uuid"
"reflect"
)
type Event struct {
AggregateId uuid.UUID
Payload interface{}
}
func (me *Event) Equals(other *Event) bool {
return me.AggregateId == other.AggregateId && reflect.DeepEqual(me.Payload, other.Payload)
}

161
goes.go
View File

@ -1,94 +1,123 @@
package goes package main
import ( import (
"fmt"
actions "./actions"
storage "./storage"
serializer "./serializer"
data "./data"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"time" "github.com/pebbe/zmq4"
"flag"
"os"
"path"
) )
var serializer Serializer var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to")
var storage Storage var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage")
type Event struct { func PathIsAbsolute(s string) bool {
AggregateId uuid.UUID if len(s) > 1 && s[1] == ':' {
Payload interface{} return true
}
return path.IsAbs(s)
} }
func SetStorage(newStorage Storage) { func main() {
storage = newStorage fmt.Println("Simple ZeroMQ server for goes.")
}
func SetSerializer(newSerializer Serializer) { flag.Parse()
serializer = newSerializer
}
var mapLock chan int = make(chan int, 1) storagePath := *db
var streamsLock map[string]chan int = make(map[string]chan int) if !PathIsAbsolute(storagePath) {
wd, _ := os.Getwd()
func lockStream(streamName string) { storagePath = path.Join(wd, storagePath)
mapLock <- 1
defer func(){
<-mapLock
}()
streamLock := streamsLock[streamName]
if streamLock == nil {
streamLock = make(chan int, 1)
streamsLock[streamName] = streamLock
} }
streamLock <- 1 fmt.Println("Listening on:", *addr)
} fmt.Println("Storage path:", storagePath)
func unlockStream(streamName string) { var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer())
<-streamsLock[streamName]
}
func AddEvent(event Event) error { context, err := zmq4.NewContext()
streamName := event.AggregateId.String()
lockStream(streamName)
defer unlockStream(streamName)
serializedPayload, typeId, err := serializer.Serialize(event.Payload)
if err != nil { if err != nil {
return err panic(err)
} }
defer context.Term()
return storage.Write(&StoredEvent{event.AggregateId, time.Now(), typeId, serializedPayload}) replySocket, err := context.NewSocket(zmq4.REP)
}
func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) {
results, err := storage.ReadStream(aggregateId)
if err != nil { if err != nil {
return nil, err panic(err)
}
defer replySocket.Close()
err = replySocket.Bind(*addr)
if err != nil {
panic(err)
} }
events := make([]*Event, 0) for {
for _, storedEvent := range results { message, err := replySocket.RecvMessageBytes(0)
event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId)
if err != nil { if err != nil {
return nil, err fmt.Println("Error receiving command from client", err)
continue
} }
events = append(events, &Event{storedEvent.StreamId, event})
}
return events, nil command := string(message[0])
switch command {
case "AddEvent":
aggregateId, err := uuid.FromBytes(message[1])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
payload := message[2]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload})
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
replySocket.Send("Ok", 0)
case "ReadStream":
aggregateId, err := uuid.FromBytes(message[1])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
events, err := handler.RetrieveFor(aggregateId)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
sendEvents(replySocket, events)
case "ReadAll":
fmt.Println("->", command)
events, err := handler.RetrieveAll()
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
sendEvents(replySocket, events)
case "Shutdown":
fmt.Println("->", command)
return
}
}
} }
func RetrieveAll() ([]*Event, error) { func sendEvents(socket *zmq4.Socket, events []*data.Event) {
results, err := storage.ReadAll() len := len(events)
if err != nil { socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
return nil, err
}
events := make([]*Event, 0) i := 0
for _, storedEvent := range results { for ; i < len-1; i++ {
event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) socket.SendBytes(events[i].Payload.([]byte), zmq4.SNDMORE)
if err != nil {
return nil, err
}
events = append(events, &Event{storedEvent.StreamId, event})
} }
socket.SendBytes(events[i].Payload.([]byte), 0)
return events, nil fmt.Println("<-", len, "events")
} }

View File

@ -1,4 +1,4 @@
package goes package main
import import
( (
@ -6,12 +6,18 @@ import
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"os" "os"
"path" "path"
"reflect"
"io/ioutil" "io/ioutil"
"bytes" "bytes"
storage "./storage"
serializer "./serializer"
data "./data"
actions "./actions"
) )
var tempDir string var tempDir string
var handler *actions.ActionsHandler
var _storage storage.Storage
var _serializer serializer.Serializer
type AnEvent struct { type AnEvent struct {
A int64 A int64
@ -26,10 +32,9 @@ type AnotherEvent struct {
func setUp() { func setUp() {
tempDir := path.Join(os.TempDir(), uuid.NewV4().String()) tempDir := path.Join(os.TempDir(), uuid.NewV4().String())
storage := NewDiskStorage(tempDir) _storage = storage.NewSimpleDiskStorage(tempDir)
SetStorage(storage) _serializer = serializer.NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil))
serializer := NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil)) handler = actions.NewActionsHandler(_storage, _serializer)
SetSerializer(serializer)
} }
func tearDown() { func tearDown() {
@ -39,8 +44,8 @@ func tearDown() {
} }
} }
func wrapEvent(aggregateId uuid.UUID, event interface{}) Event { func wrapEvent(aggregateId uuid.UUID, event interface{}) data.Event {
return Event{aggregateId, event} return data.Event{AggregateId: aggregateId, Payload: event}
} }
func TestSerializeEventToJson(t *testing.T) { func TestSerializeEventToJson(t *testing.T) {
@ -48,13 +53,13 @@ func TestSerializeEventToJson(t *testing.T) {
defer tearDown() defer tearDown()
ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"})
err := AddEvent(ev) err := handler.AddEvent(ev)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
filename := (storage.(*DiskStorage)).getFilenameForEvents(ev.AggregateId.String()); filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String());
if fi, _ := os.Stat(filename); fi == nil { if fi, _ := os.Stat(filename); fi == nil {
t.Errorf("AddEvent failed to create file %q", filename) t.Errorf("AddEvent failed to create file %q", filename)
return return
@ -72,19 +77,19 @@ func TestSerializeEventsForSameAggregateInSameFile(t *testing.T) {
aggregateId := uuid.NewV4() aggregateId := uuid.NewV4()
ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"})
err := AddEvent(ev1) err := handler.AddEvent(ev1)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45})
err = AddEvent(ev2) err = handler.AddEvent(ev2)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
filename := (storage.(*DiskStorage)).getFilenameForEvents(aggregateId.String()) filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(aggregateId.String())
content, _ := ioutil.ReadFile(filename) content, _ := ioutil.ReadFile(filename)
if !bytes.Contains(content, []byte("Hello")) || !bytes.Contains(content, []byte("Bob")) { if !bytes.Contains(content, []byte("Hello")) || !bytes.Contains(content, []byte("Bob")) {
t.Error("AddEvent failed. Both events are not serialized in same file.") t.Error("AddEvent failed. Both events are not serialized in same file.")
@ -97,13 +102,13 @@ func TestTypeInformationIsProvided(t *testing.T) {
defer tearDown() defer tearDown()
ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"})
err := AddEvent(ev) err := handler.AddEvent(ev)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
filename := (storage.(*DiskStorage)).getFilenameForEvents(ev.AggregateId.String()); filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String());
if fi, _ := os.Stat(filename); fi == nil { if fi, _ := os.Stat(filename); fi == nil {
t.Errorf("AddEvent failed to create file %q", filename) t.Errorf("AddEvent failed to create file %q", filename)
return return
@ -115,29 +120,25 @@ func TestTypeInformationIsProvided(t *testing.T) {
} }
} }
func (me *Event) Equals(other *Event) bool {
return me.AggregateId == other.AggregateId && reflect.DeepEqual(me.Payload, other.Payload)
}
func TestEventsCanBeRetrieved(t *testing.T) { func TestEventsCanBeRetrieved(t *testing.T) {
setUp() setUp()
defer tearDown() defer tearDown()
aggregateId := uuid.NewV4() aggregateId := uuid.NewV4()
ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"})
err := AddEvent(ev1) err := handler.AddEvent(ev1)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45})
err = AddEvent(ev2) err = handler.AddEvent(ev2)
if err != nil { if err != nil {
t.Errorf("AddEvent failed with %q", err) t.Errorf("AddEvent failed with %q", err)
return return
} }
events, err := RetrieveFor(aggregateId) events, err := handler.RetrieveFor(aggregateId)
switch { switch {
case err != nil: case err != nil:
t.Errorf("RetrieveFor(%q) failed with %q", aggregateId.String(), err) t.Errorf("RetrieveFor(%q) failed with %q", aggregateId.String(), err)
@ -159,11 +160,11 @@ func TestEventsCanBeReplayedInOrder(t *testing.T) {
testEvent1 := wrapEvent(aggregateId1, AnEvent{int64(123), "Hello 1"}) testEvent1 := wrapEvent(aggregateId1, AnEvent{int64(123), "Hello 1"})
testEvent2 := wrapEvent(aggregateId2, AnEvent{int64(456), "Hello 2"}) testEvent2 := wrapEvent(aggregateId2, AnEvent{int64(456), "Hello 2"})
testEvent3 := wrapEvent(aggregateId1, AnEvent{int64(789), "Hello 3"}) testEvent3 := wrapEvent(aggregateId1, AnEvent{int64(789), "Hello 3"})
AddEvent(testEvent1) handler.AddEvent(testEvent1)
AddEvent(testEvent2) handler.AddEvent(testEvent2)
AddEvent(testEvent3) handler.AddEvent(testEvent3)
events, err := RetrieveAll() events, err := handler.RetrieveAll()
switch { switch {
case err != nil: case err != nil:
t.Errorf("RetrieveAll failed with %q", err) t.Errorf("RetrieveAll failed with %q", err)

View File

@ -1,4 +1,4 @@
package goes package serializer
import ( import (
"reflect" "reflect"
@ -6,12 +6,6 @@ import (
"errors" "errors"
) )
type Serializer interface {
Serialize(interface{}) ([]byte, string, error)
Deserialize([]byte, string) (interface{}, error)
}
//TODO: any serializer will require a type registry maybe this should be abstracted
type JsonSerializer struct { type JsonSerializer struct {
types map[string]reflect.Type types map[string]reflect.Type
} }

38
serializer/passthru.go Normal file
View File

@ -0,0 +1,38 @@
package serializer
import (
"bytes"
"errors"
)
type PassthruSerializer struct {}
func NewPassthruSerializer() *PassthruSerializer {
return &PassthruSerializer{}
}
func (me PassthruSerializer) Serialize(input interface{}) (output []byte, typeId string, err error) {
content, ok := input.([]byte)
if !ok {
err = errors.New("input should be []byte")
return
}
sep := bytes.IndexByte(content, ' ')
if sep == -1 {
err = errors.New("missing split char.")
return
}
output = content[sep+1:]
typeId = string(content[0:sep])
return
}
func (me PassthruSerializer) Deserialize(input []byte, typeId string) (interface{}, error) {
output := []byte(typeId)
output = append(output, ' ')
output = append(output, input...)
return output, nil
}

6
serializer/serializer.go Normal file
View File

@ -0,0 +1,6 @@
package serializer
type Serializer interface {
Serialize(interface{}) ([]byte, string, error)
Deserialize([]byte, string) (interface{}, error)
}

View File

@ -12,143 +12,3 @@ import (
"path" "path"
) )
var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to")
var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage")
type Serializer struct {}
func NewSerializer() goes.Serializer {
return &Serializer{}
}
func (me Serializer) Serialize(input interface{}) (output []byte, typeId string, err error) {
content, ok := input.([]byte)
if !ok {
err = errors.New("input should be []byte")
return
}
sep := bytes.IndexByte(content, ' ')
if sep == -1 {
err = errors.New("missing split char.")
return
}
output = content[sep+1:]
typeId = string(content[0:sep])
return
}
func (me Serializer) Deserialize(input []byte, typeId string) (interface{}, error) {
output := []byte(typeId)
output = append(output, ' ')
output = append(output, input...)
return output, nil
}
func PathIsAbsolute(s string) bool {
if len(s) > 1 && s[1] == ':' {
return true
}
return path.IsAbs(s)
}
func main() {
fmt.Println("Simple ZeroMQ server for goes.")
flag.Parse()
storagePath := *db
if !PathIsAbsolute(storagePath) {
wd, _ := os.Getwd()
storagePath = path.Join(wd, storagePath)
}
fmt.Println("Listening on:", *addr)
fmt.Println("Storage path:", storagePath)
goes.SetStorage(goes.NewReadableDiskStorage(storagePath))
goes.SetSerializer(NewSerializer())
context, err := zmq4.NewContext()
if err != nil {
panic(err)
}
defer context.Term()
replySocket, err := context.NewSocket(zmq4.REP)
if err != nil {
panic(err)
}
defer replySocket.Close()
err = replySocket.Bind(*addr)
if err != nil {
panic(err)
}
for {
message, err := replySocket.RecvMessageBytes(0)
if err != nil {
fmt.Println("Error receiving command from client", err)
continue
}
command := string(message[0])
switch command {
case "AddEvent":
aggregateId, err := uuid.FromBytes(message[1])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
data := message[2]
err = goes.AddEvent(goes.Event{aggregateId, data})
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
replySocket.Send("Ok", 0)
case "ReadStream":
aggregateId, err := uuid.FromBytes(message[1])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
events, err := goes.RetrieveFor(aggregateId)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
sendEvents(replySocket, events)
case "ReadAll":
fmt.Println("->", command)
events, err := goes.RetrieveAll()
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
fmt.Println(err)
break
}
sendEvents(replySocket, events)
case "Shutdown":
fmt.Println("->", command)
return
}
}
}
func sendEvents(socket *zmq4.Socket, events []*goes.Event) {
len := len(events)
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
i := 0
for ; i < len-1; i++ {
socket.SendBytes(events[i].Payload.([]byte), zmq4.SNDMORE)
}
socket.SendBytes(events[i].Payload.([]byte), 0)
fmt.Println("<-", len, "events")
}

View File

@ -1,4 +1,4 @@
package goes package storage
import ( import (
"path" "path"
@ -10,26 +10,26 @@ import (
"io/ioutil" "io/ioutil"
) )
type ReadableDiskStorage struct { type DailyDiskStorage struct {
storagePath string storagePath string
indexesPath string indexesPath string
globalIndexFilename string globalIndexFilename string
} }
func NewReadableDiskStorage(storagePath string) Storage { func NewDailyDiskStorage(storagePath string) Storage {
indexesPath := path.Join(storagePath, "indexes") indexesPath := path.Join(storagePath, "indexes")
globalIndexPath := path.Join(indexesPath, "global") globalIndexPath := path.Join(indexesPath, "global")
if err := os.MkdirAll(indexesPath, 0777); err != nil { if err := os.MkdirAll(indexesPath, 0777); err != nil {
panic(err) panic(err)
} }
return &ReadableDiskStorage{storagePath, indexesPath, globalIndexPath}; return &DailyDiskStorage{storagePath, indexesPath, globalIndexPath};
} }
func (me ReadableDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string {
return path.Join(me.indexesPath, streamId.String()) return path.Join(me.indexesPath, streamId.String())
} }
func (me ReadableDiskStorage) getEventFilename(creationTime time.Time, typeId string) string { func (me DailyDiskStorage) getEventFilename(creationTime time.Time, typeId string) string {
yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month()) yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month())
day := fmt.Sprintf("%02d", creationTime.Day()) day := fmt.Sprintf("%02d", creationTime.Day())
eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId) eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId)
@ -110,7 +110,7 @@ func readEvent(filename string) ([]byte, error) {
return ioutil.ReadFile(filename) return ioutil.ReadFile(filename)
} }
func (me ReadableDiskStorage) Write(event *StoredEvent) error { func (me DailyDiskStorage) Write(event *StoredEvent) error {
eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) eventFilename := me.getEventFilename(event.CreationTime, event.TypeId)
os.MkdirAll(path.Dir(eventFilename), 0777) os.MkdirAll(path.Dir(eventFilename), 0777)
@ -135,7 +135,7 @@ func (me ReadableDiskStorage) Write(event *StoredEvent) error {
return nil return nil
} }
func (me ReadableDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0)
if err != nil { if err != nil {
@ -163,7 +163,7 @@ func (me ReadableDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, er
return events, nil return events, nil
} }
func (me ReadableDiskStorage) ReadAll() ([]*StoredEvent, error) { func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0) indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -1,4 +1,4 @@
package goes package storage
import ( import (
"testing" "testing"
@ -13,7 +13,7 @@ func TestAddEvent(t *testing.T) {
//Arrange //Arrange
storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) storagePath := path.Join(os.TempDir(), uuid.NewV4().String())
defer os.RemoveAll(storagePath) defer os.RemoveAll(storagePath)
storage := NewReadableDiskStorage(storagePath) storage := NewDailyDiskStorage(storagePath)
aLocation, _ := time.LoadLocation("") aLocation, _ := time.LoadLocation("")
aTime := time.Date(2016,2,11,9,53,32,1234567, aLocation) aTime := time.Date(2016,2,11,9,53,32,1234567, aLocation)
@ -29,7 +29,7 @@ func TestAddEvent(t *testing.T) {
t.Errorf("Write failed. Error: %v", err) t.Errorf("Write failed. Error: %v", err)
} }
readableDiskStorage := storage.(*ReadableDiskStorage) readableDiskStorage := storage.(*DailyDiskStorage)
globalIndexFi, _ := os.Stat(readableDiskStorage.globalIndexFilename) globalIndexFi, _ := os.Stat(readableDiskStorage.globalIndexFilename)
if globalIndexFi == nil { if globalIndexFi == nil {
@ -51,7 +51,7 @@ func TestReadStream(t *testing.T) {
//Arrange //Arrange
storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) storagePath := path.Join(os.TempDir(), uuid.NewV4().String())
defer os.RemoveAll(storagePath) defer os.RemoveAll(storagePath)
storage := NewReadableDiskStorage(storagePath) storage := NewDailyDiskStorage(storagePath)
streamId := uuid.NewV4() streamId := uuid.NewV4()
ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent")} ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent")}
@ -85,7 +85,7 @@ func TestReadAll(t *testing.T) {
//Arrange //Arrange
storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) storagePath := path.Join(os.TempDir(), uuid.NewV4().String())
defer os.RemoveAll(storagePath) defer os.RemoveAll(storagePath)
storage := NewReadableDiskStorage(storagePath) storage := NewDailyDiskStorage(storagePath)
stream1Id := uuid.NewV4() stream1Id := uuid.NewV4()
stream2Id := uuid.NewV4() stream2Id := uuid.NewV4()

View File

@ -1,46 +1,29 @@
package goes package storage
import ( import (
"os"
"path"
"encoding/binary" "encoding/binary"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"os"
"time"
"path"
"fmt" "fmt"
"errors" "errors"
"time"
) )
const IntegerSizeInBytes = 8 func NewSimpleDiskStorage(storagePath string) Storage {
const StreamStartingCapacity = 512 return &SimpleDiskStorage{storagePath, path.Join(storagePath, "eventindex")}
type StoredEvent struct {
StreamId uuid.UUID
CreationTime time.Time
TypeId string
Data []byte
} }
//TODO: performance - change reads array for some kind of iterator type SimpleDiskStorage struct {
type Storage interface {
Write(event *StoredEvent) error
ReadStream(streamId uuid.UUID) ([]*StoredEvent, error)
ReadAll() ([]*StoredEvent, error)
}
func NewDiskStorage(storagePath string) Storage {
return &DiskStorage{storagePath, path.Join(storagePath, "eventindex")}
}
type DiskStorage struct {
storagePath string storagePath string
indexPath string indexPath string
} }
func (me DiskStorage) getFilename(stream, extension string) string { func (me SimpleDiskStorage) getFilename(stream, extension string) string {
return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension) return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension)
} }
func (me DiskStorage) getFilenameForEvents(stream string) string { func (me SimpleDiskStorage) GetFilenameForEvents(stream string) string {
return me.getFilename(stream, ".history") return me.getFilename(stream, ".history")
} }
@ -90,8 +73,8 @@ func readSizedBytes(f *os.File) ([]byte, error) {
return data, nil return data, nil
} }
func (me DiskStorage) Write(event *StoredEvent) error { func (me SimpleDiskStorage) Write(event *StoredEvent) error {
filename := me.getFilenameForEvents(event.StreamId.String()) filename := me.GetFilenameForEvents(event.StreamId.String())
os.MkdirAll(path.Dir(filename), os.ModeDir) os.MkdirAll(path.Dir(filename), os.ModeDir)
indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
@ -128,10 +111,10 @@ func (me DiskStorage) Write(event *StoredEvent) error {
return nil return nil
} }
func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
streamName := streamId.String() streamName := streamId.String()
offset := int64(0) //TODO snapshots offset := int64(0) //TODO snapshots
filename := me.getFilenameForEvents(streamName) filename := me.GetFilenameForEvents(streamName)
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
@ -157,7 +140,7 @@ func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
return results, nil return results, nil
} }
func (me DiskStorage) ReadAll() ([]*StoredEvent, error) { func (me SimpleDiskStorage) ReadAll() ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0) indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err
@ -198,8 +181,8 @@ func (me DiskStorage) ReadAll() ([]*StoredEvent, error) {
return results, nil return results, nil
} }
func (me DiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) { func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) {
filename := me.getFilenameForEvents(streamId.String()) filename := me.GetFilenameForEvents(streamId.String())
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
@ -238,3 +221,4 @@ func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string,
return return
} }

23
storage/storage.go Normal file
View File

@ -0,0 +1,23 @@
package storage
import (
"github.com/satori/go.uuid"
"time"
)
const IntegerSizeInBytes = 8
const StreamStartingCapacity = 512
type StoredEvent struct {
StreamId uuid.UUID
CreationTime time.Time
TypeId string
Data []byte
}
//TODO: performance - change reads array for some kind of iterator
type Storage interface {
Write(event *StoredEvent) error
ReadStream(streamId uuid.UUID) ([]*StoredEvent, error)
ReadAll() ([]*StoredEvent, error)
}