Added type indexes. Updated vagrant scripts.

This commit is contained in:
Nicolas Dextraze 2016-08-28 13:29:53 -07:00
parent 320cc9e53e
commit a05f21377f
11 changed files with 99 additions and 52 deletions

4
.gitignore vendored
View File

@ -1,3 +1,3 @@
.idea/
goes
goes.exe
.vagrant/
bin/

View File

@ -32,12 +32,12 @@ In your GOPATH folder, execute the following commands:
In the project root folder, execute the following command:
`go build`
`go build -o bin/goes`
### Running the server
In the project root folder, execute the following command:
`./goes --db=./events --addr=tcp://127.0.0.1:12345`
`./bin/goes --db=./events --addr=tcp://127.0.0.1:12345`
Both flags are optional and their default values are the same as the example.

10
goes.go
View File

@ -13,6 +13,7 @@ import (
var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to")
var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage")
var buildTypeIndexes = flag.Bool("buildTypeIndexes", false, "Build type indexes")
func PathIsAbsolute(s string) bool {
if len(s) > 1 && s[1] == ':' {
@ -34,10 +35,13 @@ func main() {
storagePath = path.Join(wd, storagePath)
}
fmt.Println("Listening on:", *addr)
fmt.Println("Storage path:", storagePath)
diskStorage := storage.NewDailyDiskStorage(storagePath)
if *buildTypeIndexes {
diskStorage.RebuildTypeIndexes()
return
}
var handler = actions.NewActionsHandler(storage.NewDailyDiskStorage(storagePath), serializer.NewPassthruSerializer())
var handler = actions.NewActionsHandler(diskStorage, serializer.NewPassthruSerializer())
server.Bind(*addr)
server.Listen(handler)
server.Destroy()

View File

@ -7,7 +7,7 @@ sudo apt-get install -y git build-essential pkg-config
# Install Golang
cd /usr/local
echo 'Downloading and installing Go 1.6 ...'
curl -s https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz | tar xz
curl -s https://storage.googleapis.com/golang/go1.7.linux-amd64.tar.gz | tar xz
export GOROOT=/usr/local/go
echo 'export GOROOT=/usr/local/go' > /etc/profile.d/go.sh
export GOPATH=~/go
@ -17,18 +17,18 @@ echo 'export PATH=$PATH:/usr/local/go/bin' >> /etc/profile.d/go.sh
# Install zeromq
cd ~
echo 'Downloading libsodium-1.0.10 ...'
curl -s https://download.libsodium.org/libsodium/releases/libsodium-1.0.10.tar.gz | tar xz
cd libsodium-1.0.10
echo 'Downloading libsodium-1.0.11 ...'
curl -s https://download.libsodium.org/libsodium/releases/libsodium-1.0.11.tar.gz | tar xz
cd libsodium-1.0.11
./configure
make && make check && sudo make install
sudo ldconfig
cd ~
echo 'Downloading zeromq-4.1.4 ...'
curl -s http://download.zeromq.org/zeromq-4.1.4.tar.gz | tar xz
cd zeromq-4.1.4
echo 'Downloading zeromq-4.1.5 ...'
curl -L -s https://github.com/zeromq/zeromq4-1/releases/download/v4.1.5/zeromq-4.1.5.tar.gz | tar xz
cd zeromq-4.1.5
./configure
make && make check && sudo make install

View File

@ -2,7 +2,7 @@
# Install Goes
go get github.com/satori/go.uuid
go get github.com/pebbe/zmq4
go build -o bin/simpleserver simpleserver/simpleserver.go
go build -o bin/goes
sudo mkdir /opt/goes
sudo cp -R bin /opt/goes

View File

@ -1,2 +1,2 @@
#!/usr/bin/env bash
/opt/goes/bin/simpleserver --db /var/events &>/var/log/goes.log &
/opt/goes/bin/goes --db /var/events &>/var/log/goes.log &

View File

@ -9,31 +9,33 @@ import (
"encoding/binary"
)
var context *zmq4.Context
var replySocket *zmq4.Socket
var _context *zmq4.Context
var _replySocket *zmq4.Socket
var _addr string
func Bind(addr string) {
var err error;
_addr = addr
context, err = zmq4.NewContext()
_context, err = zmq4.NewContext()
if err != nil {
panic(err)
}
replySocket, err = context.NewSocket(zmq4.REP)
_replySocket, err = _context.NewSocket(zmq4.REP)
if err != nil {
panic(err)
}
err = replySocket.Bind(addr)
err = _replySocket.Bind(addr)
if err != nil {
panic(err)
}
}
func Destroy() {
replySocket.Close()
context.Term()
_replySocket.Close()
_context.Term()
}
const NO_FLAGS = zmq4.Flag(0)
@ -44,8 +46,10 @@ const PAYLOAD_FRAME = 2
const METADATA_FRAME = 3
func Listen(handler actions.Handler) {
fmt.Println("Listening for incoming commands on:", _addr)
for {
message, err := replySocket.RecvMessageBytes(NO_FLAGS)
message, err := _replySocket.RecvMessageBytes(NO_FLAGS)
if err != nil {
fmt.Println("Error receiving command from client", err)
continue
@ -64,11 +68,11 @@ func Listen(handler actions.Handler) {
payload := message[PAYLOAD_FRAME]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: nil}, actions.NO_EXPECTEDVERSION)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
replySocket.Send("Ok", NO_FLAGS)
_replySocket.Send("Ok", NO_FLAGS)
case "AddEvent_v2":
// v2 - "AddEvent" 16:AggregateId,4:expectedVersion {payload} {metadata}
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME][0:UUID_SIZE])
@ -82,11 +86,11 @@ func Listen(handler actions.Handler) {
metadata := message[METADATA_FRAME]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: metadata}, expectedVersion)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
replySocket.Send("Ok", NO_FLAGS)
_replySocket.Send("Ok", NO_FLAGS)
case "ReadStream", "ReadStream_v2":
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME])
if err != nil {
@ -96,28 +100,28 @@ func Listen(handler actions.Handler) {
fmt.Println("->", command, aggregateId.String())
events, err := handler.RetrieveFor(aggregateId)
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
if command == "ReadStream_v2" {
sendEvents_v2(replySocket, events)
sendEvents_v2(_replySocket, events)
break;
}
sendEvents_v1(replySocket, events)
sendEvents_v1(_replySocket, events)
case "ReadAll", "ReadAll_v2":
fmt.Println("->", command)
events, err := handler.RetrieveAll()
if err != nil {
replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
if command == "ReadAll_v2" {
sendEvents_v2(replySocket, events)
sendEvents_v2(_replySocket, events)
break;
}
sendEvents_v1(replySocket, events)
sendEvents_v1(_replySocket, events)
case "Shutdown":
fmt.Println("->", command)
return

View File

@ -1,14 +0,0 @@
package main
import (
"fmt"
"github.com/pebbe/zmq4"
"github.com/satori/go.uuid"
goes ".."
"bytes"
"errors"
"flag"
"os"
"path"
)

View File

@ -12,21 +12,24 @@ import (
)
const EMPTY_STREAM = uint32(0)
var CRLF = []byte {'\r', '\n'}
var CRLF = []byte("\r\n")
type DailyDiskStorage struct {
storagePath string
indexesPath string
typesIndexesPath string
globalIndexFilename string
}
func NewDailyDiskStorage(storagePath string) Storage {
fmt.Println("Using DailyDiskStorage path:", storagePath)
indexesPath := path.Join(storagePath, "indexes")
globalIndexPath := path.Join(indexesPath, "global")
if err := os.MkdirAll(indexesPath, 0777); err != nil {
typesIndexesPath := path.Join(indexesPath, "types")
if err := os.MkdirAll(typesIndexesPath, 0777); err != nil {
panic(err)
}
return &DailyDiskStorage{storagePath, indexesPath, globalIndexPath};
return &DailyDiskStorage{storagePath, indexesPath, typesIndexesPath, globalIndexPath};
}
func (me DailyDiskStorage) getStreamIndexFilename(streamId uuid.UUID) string {
@ -71,6 +74,21 @@ func appendIndex(filename string, entry *IndexEntry) error {
return nil
}
func (me DailyDiskStorage) appendTypeIndex(entry *IndexEntry) error {
filename := path.Join(me.typesIndexesPath, entry.typeId)
indexFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644 )
if err != nil {
return err
}
defer indexFile.Close()
value := me.getEventFilename(entry.creationTime, entry.typeId)
start := len(me.storagePath) + 1
_, err = indexFile.WriteString(value[start:] + "\r\n")
return err
}
func readIndexNextEntry(f *os.File) (*IndexEntry, error) {
index := IndexEntry{}
@ -150,7 +168,7 @@ func (me DailyDiskStorage) Write(event *StoredEvent) error {
return err
}
return nil
return me.appendTypeIndex(index)
}
func (me DailyDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) {
@ -229,3 +247,34 @@ func (me DailyDiskStorage) ReadAll() ([]*StoredEvent, error) {
return events, nil
}
func (me DailyDiskStorage) RebuildTypeIndexes() {
fmt.Print("Rebuilding type indexes... ")
err := os.RemoveAll(me.typesIndexesPath)
if err != nil {
panic(err)
}
err = os.MkdirAll(me.typesIndexesPath, 0644)
if err != nil {
panic(err)
}
globalIndexFile, err := os.OpenFile(me.globalIndexFilename, os.O_RDONLY, 0)
if err != nil {
panic(err)
}
for {
indexEntry, err := readIndexNextEntry(globalIndexFile)
if err != nil && err.Error() == "EOF" {
break
}
if err != nil {
panic(err)
}
me.appendTypeIndex(indexEntry)
}
fmt.Println("Done.")
}

View File

@ -229,3 +229,6 @@ func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string,
return
}
func (me SimpleDiskStorage) RebuildTypeIndexes() {
}

View File

@ -23,4 +23,5 @@ type Storage interface {
ReadStream(streamId uuid.UUID) ([]*StoredEvent, error)
ReadAll() ([]*StoredEvent, error)
StreamVersion(streamId uuid.UUID) (uint32, error)
RebuildTypeIndexes()
}