From d819fb7de6e99d1ebbc87746e89b3df9bcff2684 Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Fri, 26 Aug 2016 13:32:04 -0700 Subject: [PATCH] Refactoring: re-organize project intro multiple packages (folders) --- .gitignore | 2 + README.md | 10 +- actions/actions.go | 91 ++++++++++ data/event.go | 15 ++ goes.go | 163 +++++++++++------- goes_test.go | 53 +++--- serialization.go => serializer/json.go | 8 +- serializer/passthru.go | 38 ++++ serializer/serializer.go | 6 + simpleserver/simpleserver.go | 140 --------------- .../dailydisk.go | 18 +- .../dailydisk_test.go | 10 +- storage.go => storage/simpledisk.go | 52 ++---- storage/storage.go | 23 +++ 14 files changed, 335 insertions(+), 294 deletions(-) create mode 100644 actions/actions.go create mode 100644 data/event.go rename serialization.go => serializer/json.go (81%) create mode 100644 serializer/passthru.go create mode 100644 serializer/serializer.go rename readablediskstorage.go => storage/dailydisk.go (84%) rename readablediskstorage_test.go => storage/dailydisk_test.go (90%) rename storage.go => storage/simpledisk.go (77%) create mode 100644 storage/storage.go diff --git a/.gitignore b/.gitignore index 9f11b75..7947b1e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ .idea/ +goes +goes.exe \ No newline at end of file diff --git a/README.md b/README.md index 1b7e416..e5b3d3c 100644 --- a/README.md +++ b/README.md @@ -26,16 +26,14 @@ In your GOPATH folder, execute the following commands: ### Compiling the binary -In your GOPATH folder, execute the following command: +In the project root folder, execute the following command: - `go build -o bin/goes src/github.com/adymitruk/goes/simpleserver/simpleserver.go` + `go build` -\* Use `-o bin/goes.exe` on Windows - ## Running the server -In your GOPATH folder, execute the following command: +In the project root folder, execute the following command: - `./bin/goes --db=./events --addr=tcp://127.0.0.1:12345` + `./goes --db=./events --addr=tcp://127.0.0.1:12345` Both flags are optional and their default values are the same as the example. diff --git a/actions/actions.go b/actions/actions.go new file mode 100644 index 0000000..4ccc46c --- /dev/null +++ b/actions/actions.go @@ -0,0 +1,91 @@ +package actions + +import ( + "time" + "github.com/satori/go.uuid" + storage "../storage" + serializer "../serializer" + data "../data" +) + +var mapLock chan int = make(chan int, 1) +var streamsLock map[string]chan int = make(map[string]chan int) + +type ActionsHandler struct { + storage storage.Storage + serializer serializer.Serializer +} + +func NewActionsHandler(storage storage.Storage, serializer serializer.Serializer) *ActionsHandler { + return &ActionsHandler{storage, serializer} +} + +func lockStream(streamName string) { + mapLock <- 1 + defer func(){ + <-mapLock + }() + + streamLock := streamsLock[streamName] + if streamLock == nil { + streamLock = make(chan int, 1) + streamsLock[streamName] = streamLock + } + + streamLock <- 1 +} + +func unlockStream(streamName string) { + <-streamsLock[streamName] +} + +func (me ActionsHandler) AddEvent(event data.Event) error { + streamName := event.AggregateId.String() + + lockStream(streamName) + defer unlockStream(streamName) + + serializedPayload, typeId, err := me.serializer.Serialize(event.Payload) + if err != nil { + return err + } + + return me.storage.Write(&storage.StoredEvent{StreamId: event.AggregateId, CreationTime: time.Now(), TypeId: typeId, Data: serializedPayload}) +} + +func (me ActionsHandler) RetrieveFor(aggregateId uuid.UUID) ([]*data.Event, error) { + results, err := me.storage.ReadStream(aggregateId) + if err != nil { + return nil, err + } + + events := make([]*data.Event, 0) + for _, storedEvent := range results { + event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) + if err != nil { + return nil, err + } + events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event}) + } + + return events, nil +} + +func (me ActionsHandler) RetrieveAll() ([]*data.Event, error) { + results, err := me.storage.ReadAll() + if err != nil { + return nil, err + } + + events := make([]*data.Event, 0) + for _, storedEvent := range results { + event, err := me.serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) + if err != nil { + return nil, err + } + events = append(events, &data.Event{AggregateId: storedEvent.StreamId, Payload: event}) + } + + return events, nil +} + diff --git a/data/event.go b/data/event.go new file mode 100644 index 0000000..d81283b --- /dev/null +++ b/data/event.go @@ -0,0 +1,15 @@ +package data + +import ( + "github.com/satori/go.uuid" + "reflect" +) + +type Event struct { + AggregateId uuid.UUID + Payload interface{} +} + +func (me *Event) Equals(other *Event) bool { + return me.AggregateId == other.AggregateId && reflect.DeepEqual(me.Payload, other.Payload) +} diff --git a/goes.go b/goes.go index bc54a31..24d39fd 100644 --- a/goes.go +++ b/goes.go @@ -1,94 +1,123 @@ -package goes +package main import ( + "fmt" + actions "./actions" + storage "./storage" + serializer "./serializer" + data "./data" "github.com/satori/go.uuid" - "time" + "github.com/pebbe/zmq4" + "flag" + "os" + "path" ) -var serializer Serializer -var storage Storage +var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to") +var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage") -type Event struct { - AggregateId uuid.UUID - Payload interface{} +func PathIsAbsolute(s string) bool { + if len(s) > 1 && s[1] == ':' { + return true + } + return path.IsAbs(s) } -func SetStorage(newStorage Storage) { - storage = newStorage -} +func main() { + fmt.Println("Simple ZeroMQ server for goes.") -func SetSerializer(newSerializer Serializer) { - serializer = newSerializer -} + flag.Parse() -var mapLock chan int = make(chan int, 1) -var streamsLock map[string]chan int = make(map[string]chan int) - -func lockStream(streamName string) { - mapLock <- 1 - defer func(){ - <-mapLock - }() - - streamLock := streamsLock[streamName] - if streamLock == nil { - streamLock = make(chan int, 1) - streamsLock[streamName] = streamLock + storagePath := *db + if !PathIsAbsolute(storagePath) { + wd, _ := os.Getwd() + storagePath = path.Join(wd, storagePath) } - streamLock <- 1 -} + fmt.Println("Listening on:", *addr) + fmt.Println("Storage path:", storagePath) -func unlockStream(streamName string) { - <-streamsLock[streamName] -} + var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer()) -func AddEvent(event Event) error { - streamName := event.AggregateId.String() - - lockStream(streamName) - defer unlockStream(streamName) - - serializedPayload, typeId, err := serializer.Serialize(event.Payload) + context, err := zmq4.NewContext() if err != nil { - return err + panic(err) } + defer context.Term() - return storage.Write(&StoredEvent{event.AggregateId, time.Now(), typeId, serializedPayload}) -} - -func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) { - results, err := storage.ReadStream(aggregateId) + replySocket, err := context.NewSocket(zmq4.REP) if err != nil { - return nil, err + panic(err) + } + defer replySocket.Close() + + err = replySocket.Bind(*addr) + if err != nil { + panic(err) } - events := make([]*Event, 0) - for _, storedEvent := range results { - event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) + for { + message, err := replySocket.RecvMessageBytes(0) if err != nil { - return nil, err + fmt.Println("Error receiving command from client", err) + continue } - events = append(events, &Event{storedEvent.StreamId, event}) - } - return events, nil + command := string(message[0]) + switch command { + case "AddEvent": + aggregateId, err := uuid.FromBytes(message[1]) + if err != nil { + fmt.Println("Wrong format for AggregateId", err) + break + } + fmt.Println("->", command, aggregateId.String()) + payload := message[2] + err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload}) + if err != nil { + replySocket.Send(fmt.Sprintf("Error: %v", err), 0) + fmt.Println(err) + break + } + replySocket.Send("Ok", 0) + case "ReadStream": + aggregateId, err := uuid.FromBytes(message[1]) + if err != nil { + fmt.Println("Wrong format for AggregateId", err) + break + } + fmt.Println("->", command, aggregateId.String()) + events, err := handler.RetrieveFor(aggregateId) + if err != nil { + replySocket.Send(fmt.Sprintf("Error: %v", err), 0) + fmt.Println(err) + break + } + sendEvents(replySocket, events) + case "ReadAll": + fmt.Println("->", command) + events, err := handler.RetrieveAll() + if err != nil { + replySocket.Send(fmt.Sprintf("Error: %v", err), 0) + fmt.Println(err) + break + } + sendEvents(replySocket, events) + case "Shutdown": + fmt.Println("->", command) + return + } + } } -func RetrieveAll() ([]*Event, error) { - results, err := storage.ReadAll() - if err != nil { - return nil, err - } +func sendEvents(socket *zmq4.Socket, events []*data.Event) { + len := len(events) + socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE) - events := make([]*Event, 0) - for _, storedEvent := range results { - event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId) - if err != nil { - return nil, err - } - events = append(events, &Event{storedEvent.StreamId, event}) + i := 0 + for ; i < len-1; i++ { + socket.SendBytes(events[i].Payload.([]byte), zmq4.SNDMORE) } - - return events, nil -} \ No newline at end of file + socket.SendBytes(events[i].Payload.([]byte), 0) + fmt.Println("<-", len, "events") +} diff --git a/goes_test.go b/goes_test.go index eeb1ba7..50f1a73 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1,4 +1,4 @@ -package goes +package main import ( @@ -6,12 +6,18 @@ import "github.com/satori/go.uuid" "os" "path" - "reflect" "io/ioutil" "bytes" + storage "./storage" + serializer "./serializer" + data "./data" + actions "./actions" ) var tempDir string +var handler *actions.ActionsHandler +var _storage storage.Storage +var _serializer serializer.Serializer type AnEvent struct { A int64 @@ -26,10 +32,9 @@ type AnotherEvent struct { func setUp() { tempDir := path.Join(os.TempDir(), uuid.NewV4().String()) - storage := NewDiskStorage(tempDir) - SetStorage(storage) - serializer := NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil)) - SetSerializer(serializer) + _storage = storage.NewSimpleDiskStorage(tempDir) + _serializer = serializer.NewJsonSerializer((*AnEvent)(nil), (*AnotherEvent)(nil)) + handler = actions.NewActionsHandler(_storage, _serializer) } func tearDown() { @@ -39,8 +44,8 @@ func tearDown() { } } -func wrapEvent(aggregateId uuid.UUID, event interface{}) Event { - return Event{aggregateId, event} +func wrapEvent(aggregateId uuid.UUID, event interface{}) data.Event { + return data.Event{AggregateId: aggregateId, Payload: event} } func TestSerializeEventToJson(t *testing.T) { @@ -48,13 +53,13 @@ func TestSerializeEventToJson(t *testing.T) { defer tearDown() ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) - err := AddEvent(ev) + err := handler.AddEvent(ev) if err != nil { t.Errorf("AddEvent failed with %q", err) return } - filename := (storage.(*DiskStorage)).getFilenameForEvents(ev.AggregateId.String()); + filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()); if fi, _ := os.Stat(filename); fi == nil { t.Errorf("AddEvent failed to create file %q", filename) return @@ -72,19 +77,19 @@ func TestSerializeEventsForSameAggregateInSameFile(t *testing.T) { aggregateId := uuid.NewV4() ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) - err := AddEvent(ev1) + err := handler.AddEvent(ev1) if err != nil { t.Errorf("AddEvent failed with %q", err) return } ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) - err = AddEvent(ev2) + err = handler.AddEvent(ev2) if err != nil { t.Errorf("AddEvent failed with %q", err) return } - filename := (storage.(*DiskStorage)).getFilenameForEvents(aggregateId.String()) + filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(aggregateId.String()) content, _ := ioutil.ReadFile(filename) if !bytes.Contains(content, []byte("Hello")) || !bytes.Contains(content, []byte("Bob")) { t.Error("AddEvent failed. Both events are not serialized in same file.") @@ -97,13 +102,13 @@ func TestTypeInformationIsProvided(t *testing.T) { defer tearDown() ev := wrapEvent(uuid.NewV4(), AnEvent{int64(1024), "Tests"}) - err := AddEvent(ev) + err := handler.AddEvent(ev) if err != nil { t.Errorf("AddEvent failed with %q", err) return } - filename := (storage.(*DiskStorage)).getFilenameForEvents(ev.AggregateId.String()); + filename := (_storage.(*storage.SimpleDiskStorage)).GetFilenameForEvents(ev.AggregateId.String()); if fi, _ := os.Stat(filename); fi == nil { t.Errorf("AddEvent failed to create file %q", filename) return @@ -115,29 +120,25 @@ func TestTypeInformationIsProvided(t *testing.T) { } } -func (me *Event) Equals(other *Event) bool { - return me.AggregateId == other.AggregateId && reflect.DeepEqual(me.Payload, other.Payload) -} - func TestEventsCanBeRetrieved(t *testing.T) { setUp() defer tearDown() aggregateId := uuid.NewV4() ev1 := wrapEvent(aggregateId, AnEvent{int64(12345), "Hello"}) - err := AddEvent(ev1) + err := handler.AddEvent(ev1) if err != nil { t.Errorf("AddEvent failed with %q", err) return } ev2 := wrapEvent(aggregateId, AnotherEvent{int64(23456), "Bob", 123.45}) - err = AddEvent(ev2) + err = handler.AddEvent(ev2) if err != nil { t.Errorf("AddEvent failed with %q", err) return } - events, err := RetrieveFor(aggregateId) + events, err := handler.RetrieveFor(aggregateId) switch { case err != nil: t.Errorf("RetrieveFor(%q) failed with %q", aggregateId.String(), err) @@ -159,11 +160,11 @@ func TestEventsCanBeReplayedInOrder(t *testing.T) { testEvent1 := wrapEvent(aggregateId1, AnEvent{int64(123), "Hello 1"}) testEvent2 := wrapEvent(aggregateId2, AnEvent{int64(456), "Hello 2"}) testEvent3 := wrapEvent(aggregateId1, AnEvent{int64(789), "Hello 3"}) - AddEvent(testEvent1) - AddEvent(testEvent2) - AddEvent(testEvent3) + handler.AddEvent(testEvent1) + handler.AddEvent(testEvent2) + handler.AddEvent(testEvent3) - events, err := RetrieveAll() + events, err := handler.RetrieveAll() switch { case err != nil: t.Errorf("RetrieveAll failed with %q", err) diff --git a/serialization.go b/serializer/json.go similarity index 81% rename from serialization.go rename to serializer/json.go index af018e1..d915680 100644 --- a/serialization.go +++ b/serializer/json.go @@ -1,4 +1,4 @@ -package goes +package serializer import ( "reflect" @@ -6,12 +6,6 @@ import ( "errors" ) -type Serializer interface { - Serialize(interface{}) ([]byte, string, error) - Deserialize([]byte, string) (interface{}, error) -} - -//TODO: any serializer will require a type registry maybe this should be abstracted type JsonSerializer struct { types map[string]reflect.Type } diff --git a/serializer/passthru.go b/serializer/passthru.go new file mode 100644 index 0000000..06b5337 --- /dev/null +++ b/serializer/passthru.go @@ -0,0 +1,38 @@ +package serializer + +import ( + "bytes" + "errors" +) + +type PassthruSerializer struct {} + +func NewPassthruSerializer() *PassthruSerializer { + return &PassthruSerializer{} +} + +func (me PassthruSerializer) Serialize(input interface{}) (output []byte, typeId string, err error) { + content, ok := input.([]byte) + if !ok { + err = errors.New("input should be []byte") + return + } + + sep := bytes.IndexByte(content, ' ') + if sep == -1 { + err = errors.New("missing split char.") + return + } + + output = content[sep+1:] + typeId = string(content[0:sep]) + return +} + +func (me PassthruSerializer) Deserialize(input []byte, typeId string) (interface{}, error) { + output := []byte(typeId) + output = append(output, ' ') + output = append(output, input...) + + return output, nil +} diff --git a/serializer/serializer.go b/serializer/serializer.go new file mode 100644 index 0000000..581d5d4 --- /dev/null +++ b/serializer/serializer.go @@ -0,0 +1,6 @@ +package serializer + +type Serializer interface { + Serialize(interface{}) ([]byte, string, error) + Deserialize([]byte, string) (interface{}, error) +} \ No newline at end of file diff --git a/simpleserver/simpleserver.go b/simpleserver/simpleserver.go index 270f72c..1898943 100644 --- a/simpleserver/simpleserver.go +++ b/simpleserver/simpleserver.go @@ -12,143 +12,3 @@ import ( "path" ) -var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to") -var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage") - -type Serializer struct {} - -func NewSerializer() goes.Serializer { - return &Serializer{} -} - -func (me Serializer) Serialize(input interface{}) (output []byte, typeId string, err error) { - content, ok := input.([]byte) - if !ok { - err = errors.New("input should be []byte") - return - } - - sep := bytes.IndexByte(content, ' ') - if sep == -1 { - err = errors.New("missing split char.") - return - } - - output = content[sep+1:] - typeId = string(content[0:sep]) - return -} - -func (me Serializer) Deserialize(input []byte, typeId string) (interface{}, error) { - output := []byte(typeId) - output = append(output, ' ') - output = append(output, input...) - - return output, nil -} - -func PathIsAbsolute(s string) bool { - if len(s) > 1 && s[1] == ':' { - return true - } - return path.IsAbs(s) -} - -func main() { - fmt.Println("Simple ZeroMQ server for goes.") - - flag.Parse() - - storagePath := *db - if !PathIsAbsolute(storagePath) { - wd, _ := os.Getwd() - storagePath = path.Join(wd, storagePath) - } - - fmt.Println("Listening on:", *addr) - fmt.Println("Storage path:", storagePath) - goes.SetStorage(goes.NewReadableDiskStorage(storagePath)) - goes.SetSerializer(NewSerializer()) - - context, err := zmq4.NewContext() - if err != nil { - panic(err) - } - defer context.Term() - - replySocket, err := context.NewSocket(zmq4.REP) - if err != nil { - panic(err) - } - defer replySocket.Close() - - err = replySocket.Bind(*addr) - if err != nil { - panic(err) - } - - for { - message, err := replySocket.RecvMessageBytes(0) - if err != nil { - fmt.Println("Error receiving command from client", err) - continue - } - - command := string(message[0]) - switch command { - case "AddEvent": - aggregateId, err := uuid.FromBytes(message[1]) - if err != nil { - fmt.Println("Wrong format for AggregateId", err) - break - } - fmt.Println("->", command, aggregateId.String()) - data := message[2] - err = goes.AddEvent(goes.Event{aggregateId, data}) - if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), 0) - fmt.Println(err) - break - } - replySocket.Send("Ok", 0) - case "ReadStream": - aggregateId, err := uuid.FromBytes(message[1]) - if err != nil { - fmt.Println("Wrong format for AggregateId", err) - break - } - fmt.Println("->", command, aggregateId.String()) - events, err := goes.RetrieveFor(aggregateId) - if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), 0) - fmt.Println(err) - break - } - sendEvents(replySocket, events) - case "ReadAll": - fmt.Println("->", command) - events, err := goes.RetrieveAll() - if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), 0) - fmt.Println(err) - break - } - sendEvents(replySocket, events) - case "Shutdown": - fmt.Println("->", command) - return - } - } -} - -func sendEvents(socket *zmq4.Socket, events []*goes.Event) { - len := len(events) - socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE) - - i := 0 - for ; i < len-1; i++ { - socket.SendBytes(events[i].Payload.([]byte), zmq4.SNDMORE) - } - socket.SendBytes(events[i].Payload.([]byte), 0) - fmt.Println("<-", len, "events") -} diff --git a/readablediskstorage.go b/storage/dailydisk.go similarity index 84% rename from readablediskstorage.go rename to storage/dailydisk.go index 8ec1f38..1e429e3 100644 --- a/readablediskstorage.go +++ b/storage/dailydisk.go @@ -1,4 +1,4 @@ -package goes +package storage import ( "path" @@ -10,26 +10,26 @@ import ( "io/ioutil" ) -type ReadableDiskStorage struct { +type DailyDiskStorage struct { storagePath string indexesPath string globalIndexFilename string } -func NewReadableDiskStorage(storagePath string) Storage { +func NewDailyDiskStorage(storagePath string) Storage { indexesPath := path.Join(storagePath, "indexes") globalIndexPath := path.Join(indexesPath, "global") if err := os.MkdirAll(indexesPath, 0777); err != nil { panic(err) } - return &ReadableDiskStorage{storagePath, indexesPath, globalIndexPath}; + return &DailyDiskStorage{storagePath, indexesPath, globalIndexPath}; } -func (me ReadableDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { +func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { return path.Join(me.indexesPath, streamId.String()) } -func (me ReadableDiskStorage) getEventFilename(creationTime time.Time, typeId string) string { +func (me DailyDiskStorage) getEventFilename(creationTime time.Time, typeId string) string { yearMonth := fmt.Sprintf("%04d%02d", creationTime.Year(), creationTime.Month()) day := fmt.Sprintf("%02d", creationTime.Day()) eventFilename := fmt.Sprintf("%02d%02d%02d%09d_%s", creationTime.Hour(), creationTime.Minute(), creationTime.Second(), creationTime.Nanosecond(), typeId) @@ -110,7 +110,7 @@ func readEvent(filename string) ([]byte, error) { return ioutil.ReadFile(filename) } -func (me ReadableDiskStorage) Write(event *StoredEvent) error { +func (me DailyDiskStorage) Write(event *StoredEvent) error { eventFilename := me.getEventFilename(event.CreationTime, event.TypeId) os.MkdirAll(path.Dir(eventFilename), 0777) @@ -135,7 +135,7 @@ func (me ReadableDiskStorage) Write(event *StoredEvent) error { return nil } -func (me ReadableDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { +func (me DailyDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { indexFile, err := os.OpenFile(me.getStreamIndexFilename(streamId), os.O_RDONLY, 0) if err != nil { @@ -163,7 +163,7 @@ func (me ReadableDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, er return events, nil } -func (me ReadableDiskStorage) ReadAll() ([]*StoredEvent, error) { +func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) { indexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0) if err != nil { return nil, err diff --git a/readablediskstorage_test.go b/storage/dailydisk_test.go similarity index 90% rename from readablediskstorage_test.go rename to storage/dailydisk_test.go index 3adffef..63f224e 100644 --- a/readablediskstorage_test.go +++ b/storage/dailydisk_test.go @@ -1,4 +1,4 @@ -package goes +package storage import ( "testing" @@ -13,7 +13,7 @@ func TestAddEvent(t *testing.T) { //Arrange storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) defer os.RemoveAll(storagePath) - storage := NewReadableDiskStorage(storagePath) + storage := NewDailyDiskStorage(storagePath) aLocation, _ := time.LoadLocation("") aTime := time.Date(2016,2,11,9,53,32,1234567, aLocation) @@ -29,7 +29,7 @@ func TestAddEvent(t *testing.T) { t.Errorf("Write failed. Error: %v", err) } - readableDiskStorage := storage.(*ReadableDiskStorage) + readableDiskStorage := storage.(*DailyDiskStorage) globalIndexFi, _ := os.Stat(readableDiskStorage.globalIndexFilename) if globalIndexFi == nil { @@ -51,7 +51,7 @@ func TestReadStream(t *testing.T) { //Arrange storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) defer os.RemoveAll(storagePath) - storage := NewReadableDiskStorage(storagePath) + storage := NewDailyDiskStorage(storagePath) streamId := uuid.NewV4() ev1 := &StoredEvent{streamId, time.Now(), "1stType", []byte("1stEvent")} @@ -85,7 +85,7 @@ func TestReadAll(t *testing.T) { //Arrange storagePath := path.Join(os.TempDir(), uuid.NewV4().String()) defer os.RemoveAll(storagePath) - storage := NewReadableDiskStorage(storagePath) + storage := NewDailyDiskStorage(storagePath) stream1Id := uuid.NewV4() stream2Id := uuid.NewV4() diff --git a/storage.go b/storage/simpledisk.go similarity index 77% rename from storage.go rename to storage/simpledisk.go index 181d6b1..5e29e8e 100644 --- a/storage.go +++ b/storage/simpledisk.go @@ -1,46 +1,29 @@ -package goes +package storage import ( - "os" - "path" "encoding/binary" "github.com/satori/go.uuid" + "os" + "time" + "path" "fmt" "errors" - "time" ) -const IntegerSizeInBytes = 8 -const StreamStartingCapacity = 512 - -type StoredEvent struct { - StreamId uuid.UUID - CreationTime time.Time - TypeId string - Data []byte +func NewSimpleDiskStorage(storagePath string) Storage { + return &SimpleDiskStorage{storagePath, path.Join(storagePath, "eventindex")} } -//TODO: performance - change reads array for some kind of iterator -type Storage interface { - Write(event *StoredEvent) error - ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) - ReadAll() ([]*StoredEvent, error) -} - -func NewDiskStorage(storagePath string) Storage { - return &DiskStorage{storagePath, path.Join(storagePath, "eventindex")} -} - -type DiskStorage struct { +type SimpleDiskStorage struct { storagePath string indexPath string } -func (me DiskStorage) getFilename(stream, extension string) string { +func (me SimpleDiskStorage) getFilename(stream, extension string) string { return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension) } -func (me DiskStorage) getFilenameForEvents(stream string) string { +func (me SimpleDiskStorage) GetFilenameForEvents(stream string) string { return me.getFilename(stream, ".history") } @@ -90,8 +73,8 @@ func readSizedBytes(f *os.File) ([]byte, error) { return data, nil } -func (me DiskStorage) Write(event *StoredEvent) error { - filename := me.getFilenameForEvents(event.StreamId.String()) +func (me SimpleDiskStorage) Write(event *StoredEvent) error { + filename := me.GetFilenameForEvents(event.StreamId.String()) os.MkdirAll(path.Dir(filename), os.ModeDir) indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644) @@ -128,10 +111,10 @@ func (me DiskStorage) Write(event *StoredEvent) error { return nil } -func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { +func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { streamName := streamId.String() offset := int64(0) //TODO snapshots - filename := me.getFilenameForEvents(streamName) + filename := me.GetFilenameForEvents(streamName) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) if err != nil { @@ -157,7 +140,7 @@ func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) { return results, nil } -func (me DiskStorage) ReadAll() ([]*StoredEvent, error) { +func (me SimpleDiskStorage) ReadAll() ([]*StoredEvent, error) { indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0) if err != nil { return nil, err @@ -198,8 +181,8 @@ func (me DiskStorage) ReadAll() ([]*StoredEvent, error) { return results, nil } -func (me DiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) { - filename := me.getFilenameForEvents(streamId.String()) +func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) { + filename := me.GetFilenameForEvents(streamId.String()) eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0) if err != nil { @@ -237,4 +220,5 @@ func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data, err = readSizedBytes(eventsFile) return -} \ No newline at end of file +} + diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..ee9bc52 --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,23 @@ +package storage + +import ( + "github.com/satori/go.uuid" + "time" +) + +const IntegerSizeInBytes = 8 +const StreamStartingCapacity = 512 + +type StoredEvent struct { + StreamId uuid.UUID + CreationTime time.Time + TypeId string + Data []byte +} + +//TODO: performance - change reads array for some kind of iterator +type Storage interface { + Write(event *StoredEvent) error + ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) + ReadAll() ([]*StoredEvent, error) +}