From a05f21377f5667b9f3667309f36bf67cd676997b Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Sun, 28 Aug 2016 13:29:53 -0700 Subject: [PATCH] Added type indexes. Updated vagrant scripts. --- .gitignore | 4 +-- README.md | 4 +-- goes.go | 10 +++++-- scripts/bootstrap.sh | 14 ++++----- scripts/install.sh | 2 +- scripts/start.sh | 2 +- server/zeromq.go | 40 +++++++++++++------------ simpleserver/simpleserver.go | 14 --------- storage/dailydisk.go | 57 +++++++++++++++++++++++++++++++++--- storage/simpledisk.go | 3 ++ storage/storage.go | 1 + 11 files changed, 99 insertions(+), 52 deletions(-) delete mode 100644 simpleserver/simpleserver.go diff --git a/.gitignore b/.gitignore index 7947b1e..eff9604 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ .idea/ -goes -goes.exe \ No newline at end of file +.vagrant/ +bin/ diff --git a/README.md b/README.md index a0d7371..f45e013 100644 --- a/README.md +++ b/README.md @@ -32,12 +32,12 @@ In your GOPATH folder, execute the following commands: In the project root folder, execute the following command: - `go build` + `go build -o bin/goes` ### Running the server In the project root folder, execute the following command: - `./goes --db=./events --addr=tcp://127.0.0.1:12345` + `./bin/goes --db=./events --addr=tcp://127.0.0.1:12345` Both flags are optional and their default values are the same as the example. diff --git a/goes.go b/goes.go index 3d0b02c..84ae5e3 100644 --- a/goes.go +++ b/goes.go @@ -13,6 +13,7 @@ import ( var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to") var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage") +var buildTypeIndexes = flag.Bool("buildTypeIndexes", false, "Build type indexes") func PathIsAbsolute(s string) bool { if len(s) > 1 && s[1] == ':' { @@ -34,10 +35,13 @@ func main() { storagePath = path.Join(wd, storagePath) } - fmt.Println("Listening on:", *addr) - fmt.Println("Storage path:", storagePath) + diskStorage := storage.NewDailyDiskStorage(storagePath) + if *buildTypeIndexes { + diskStorage.RebuildTypeIndexes() + return + } - var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer()) + var handler = actions.NewActionsHandler(diskStorage, serializer.NewPassthruSerializer()) server.Bind(*addr) server.Listen(handler) server.Destroy() diff --git a/scripts/bootstrap.sh b/scripts/bootstrap.sh index 1746fb6..36cb546 100644 --- a/scripts/bootstrap.sh +++ b/scripts/bootstrap.sh @@ -7,7 +7,7 @@ sudo apt-get install -y git build-essential pkg-config # Install Golang cd /usr/local echo 'Downloading and installing Go 1.6 ...' -curl -s https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz | tar xz +curl -s https://storage.googleapis.com/golang/go1.7.linux-amd64.tar.gz | tar xz export GOROOT=/usr/local/go echo 'export GOROOT=/usr/local/go' > /etc/profile.d/go.sh export GOPATH=~/go @@ -17,18 +17,18 @@ echo 'export PATH=$PATH:/usr/local/go/bin' >> /etc/profile.d/go.sh # Install zeromq cd ~ -echo 'Downloading libsodium-1.0.10 ...' -curl -s https://download.libsodium.org/libsodium/releases/libsodium-1.0.10.tar.gz | tar xz -cd libsodium-1.0.10 +echo 'Downloading libsodium-1.0.11 ...' +curl -s https://download.libsodium.org/libsodium/releases/libsodium-1.0.11.tar.gz | tar xz +cd libsodium-1.0.11 ./configure make && make check && sudo make install sudo ldconfig cd ~ -echo 'Downloading zeromq-4.1.4 ...' -curl -s http://download.zeromq.org/zeromq-4.1.4.tar.gz | tar xz -cd zeromq-4.1.4 +echo 'Downloading zeromq-4.1.5 ...' +curl -L -s https://github.com/zeromq/zeromq4-1/releases/download/v4.1.5/zeromq-4.1.5.tar.gz | tar xz +cd zeromq-4.1.5 ./configure make && make check && sudo make install diff --git a/scripts/install.sh b/scripts/install.sh index 6f1f4ce..f8cb878 100644 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -2,7 +2,7 @@ # Install Goes go get github.com/satori/go.uuid go get github.com/pebbe/zmq4 -go build -o bin/simpleserver simpleserver/simpleserver.go +go build -o bin/goes sudo mkdir /opt/goes sudo cp -R bin /opt/goes diff --git a/scripts/start.sh b/scripts/start.sh index a3113c0..1b0a8a5 100644 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -1,2 +1,2 @@ #!/usr/bin/env bash -/opt/goes/bin/simpleserver --db /var/events &>/var/log/goes.log & +/opt/goes/bin/goes --db /var/events &>/var/log/goes.log & diff --git a/server/zeromq.go b/server/zeromq.go index dae5783..156b94b 100644 --- a/server/zeromq.go +++ b/server/zeromq.go @@ -9,31 +9,33 @@ import ( "encoding/binary" ) -var context *zmq4.Context -var replySocket *zmq4.Socket +var _context *zmq4.Context +var _replySocket *zmq4.Socket +var _addr string func Bind(addr string) { var err error; + _addr = addr - context, err = zmq4.NewContext() + _context, err = zmq4.NewContext() if err != nil { panic(err) } - replySocket, err = context.NewSocket(zmq4.REP) + _replySocket, err = _context.NewSocket(zmq4.REP) if err != nil { panic(err) } - err = replySocket.Bind(addr) + err = _replySocket.Bind(addr) if err != nil { panic(err) } } func Destroy() { - replySocket.Close() - context.Term() + _replySocket.Close() + _context.Term() } const NO_FLAGS = zmq4.Flag(0) @@ -44,8 +46,10 @@ const PAYLOAD_FRAME = 2 const METADATA_FRAME = 3 func Listen(handler actions.Handler) { + fmt.Println("Listening for incoming commands on:", _addr) + for { - message, err := replySocket.RecvMessageBytes(NO_FLAGS) + message, err := _replySocket.RecvMessageBytes(NO_FLAGS) if err != nil { fmt.Println("Error receiving command from client", err) continue @@ -64,11 +68,11 @@ func Listen(handler actions.Handler) { payload := message[PAYLOAD_FRAME] err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: nil}, actions.NO_EXPECTEDVERSION) if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + _replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) fmt.Println(err) break } - replySocket.Send("Ok", NO_FLAGS) + _replySocket.Send("Ok", NO_FLAGS) case "AddEvent_v2": // v2 - "AddEvent" 16:AggregateId,4:expectedVersion {payload} {metadata} aggregateId, err := uuid.FromBytes(message[ARGS_FRAME][0:UUID_SIZE]) @@ -82,11 +86,11 @@ func Listen(handler actions.Handler) { metadata := message[METADATA_FRAME] err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: metadata}, expectedVersion) if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + _replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) fmt.Println(err) break } - replySocket.Send("Ok", NO_FLAGS) + _replySocket.Send("Ok", NO_FLAGS) case "ReadStream", "ReadStream_v2": aggregateId, err := uuid.FromBytes(message[ARGS_FRAME]) if err != nil { @@ -96,28 +100,28 @@ func Listen(handler actions.Handler) { fmt.Println("->", command, aggregateId.String()) events, err := handler.RetrieveFor(aggregateId) if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + _replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) fmt.Println(err) break } if command == "ReadStream_v2" { - sendEvents_v2(replySocket, events) + sendEvents_v2(_replySocket, events) break; } - sendEvents_v1(replySocket, events) + sendEvents_v1(_replySocket, events) case "ReadAll", "ReadAll_v2": fmt.Println("->", command) events, err := handler.RetrieveAll() if err != nil { - replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) + _replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS) fmt.Println(err) break } if command == "ReadAll_v2" { - sendEvents_v2(replySocket, events) + sendEvents_v2(_replySocket, events) break; } - sendEvents_v1(replySocket, events) + sendEvents_v1(_replySocket, events) case "Shutdown": fmt.Println("->", command) return diff --git a/simpleserver/simpleserver.go b/simpleserver/simpleserver.go deleted file mode 100644 index 1898943..0000000 --- a/simpleserver/simpleserver.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -import ( - "fmt" - "github.com/pebbe/zmq4" - "github.com/satori/go.uuid" - goes ".." - "bytes" - "errors" - "flag" - "os" - "path" -) - diff --git a/storage/dailydisk.go b/storage/dailydisk.go index 11da7e2..4e13af5 100644 --- a/storage/dailydisk.go +++ b/storage/dailydisk.go @@ -12,21 +12,24 @@ import ( ) const EMPTY_STREAM = uint32(0) -var CRLF = []byte {'\r', '\n'} +var CRLF = []byte("\r\n") type DailyDiskStorage struct { storagePath string indexesPath string + typesIndexesPath string globalIndexFilename string } func NewDailyDiskStorage(storagePath string) Storage { + fmt.Println("Using DailyDiskStorage path:", storagePath) indexesPath := path.Join(storagePath, "indexes") globalIndexPath := path.Join(indexesPath, "global") - if err := os.MkdirAll(indexesPath, 0777); err != nil { + typesIndexesPath := path.Join(indexesPath, "types") + if err := os.MkdirAll(typesIndexesPath, 0777); err != nil { panic(err) } - return &DailyDiskStorage{storagePath, indexesPath, globalIndexPath}; + return &DailyDiskStorage{storagePath, indexesPath, typesIndexesPath, globalIndexPath}; } func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string { @@ -71,6 +74,21 @@ func appendIndex(filename string, entry *IndexEntry) error { return nil } +func (me DailyDiskStorage) appendTypeIndex(entry *IndexEntry) error { + filename := path.Join(me.typesIndexesPath, entry.typeId) + indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644 ) + if err != nil { + return err + } + defer indexFile.Close() + + value := me.getEventFilename(entry.creationTime, entry.typeId) + start := len(me.storagePath) + 1 + _, err = indexFile.WriteString(value[start:] + "\r\n") + + return err +} + func readIndexNextEntry(f *os.File) (*IndexEntry, error) { index := IndexEntry{} @@ -150,7 +168,7 @@ func (me DailyDiskStorage) Write(event *StoredEvent) error { return err } - return nil + return me.appendTypeIndex(index) } func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) { @@ -228,4 +246,35 @@ func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) { } return events, nil +} + +func (me DailyDiskStorage) RebuildTypeIndexes() { + fmt.Print("Rebuilding type indexes... ") + + err := os.RemoveAll(me.typesIndexesPath) + if err != nil { + panic(err) + } + err = os.MkdirAll(me.typesIndexesPath, 0644) + if err != nil { + panic(err) + } + + globalIndexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0) + if err != nil { + panic(err) + } + + for { + indexEntry, err := readIndexNextEntry(globalIndexFile) + if err != nil && err.Error() == "EOF" { + break + } + if err != nil { + panic(err) + } + me.appendTypeIndex(indexEntry) + } + + fmt.Println("Done.") } \ No newline at end of file diff --git a/storage/simpledisk.go b/storage/simpledisk.go index fac5297..114e3cc 100644 --- a/storage/simpledisk.go +++ b/storage/simpledisk.go @@ -229,3 +229,6 @@ func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, return } +func (me SimpleDiskStorage) RebuildTypeIndexes() { + +} \ No newline at end of file diff --git a/storage/storage.go b/storage/storage.go index bdf0850..98be1eb 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -23,4 +23,5 @@ type Storage interface { ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) ReadAll() ([]*StoredEvent, error) StreamVersion(streamId uuid.UUID) (uint32, error) + RebuildTypeIndexes() }