2016-02-15 21:34:23 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"github.com/pebbe/zmq4"
|
|
|
|
"github.com/satori/go.uuid"
|
2016-06-12 23:04:13 +00:00
|
|
|
goes ".."
|
2016-02-18 02:15:40 +00:00
|
|
|
"bytes"
|
|
|
|
"errors"
|
2016-02-19 00:28:02 +00:00
|
|
|
"flag"
|
|
|
|
"os"
|
|
|
|
"path"
|
2016-02-15 21:34:23 +00:00
|
|
|
)
|
|
|
|
|
2016-02-23 21:01:37 +00:00
|
|
|
var addr = flag.String("addr", "tcp://127.0.0.1:12345", "zeromq address to listen to")
|
2016-02-19 00:28:02 +00:00
|
|
|
var db = flag.String("db", fmt.Sprintf(".%cevents", os.PathSeparator), "path for storage")
|
|
|
|
|
|
|
|
type Serializer struct {}
|
2016-02-18 02:15:40 +00:00
|
|
|
|
|
|
|
func NewSerializer() goes.Serializer {
|
|
|
|
return &Serializer{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (me Serializer) Serialize(input interface{}) (output []byte, typeId string, err error) {
|
|
|
|
content, ok := input.([]byte)
|
|
|
|
if !ok {
|
|
|
|
err = errors.New("input should be []byte")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
sep := bytes.IndexByte(content, ' ')
|
|
|
|
if sep == -1 {
|
|
|
|
err = errors.New("missing split char.")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
output = content[sep+1:]
|
|
|
|
typeId = string(content[0:sep])
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (me Serializer) Deserialize(input []byte, typeId string) (interface{}, error) {
|
|
|
|
output := []byte(typeId)
|
|
|
|
output = append(output, ' ')
|
|
|
|
output = append(output, input...)
|
|
|
|
|
|
|
|
return output, nil
|
|
|
|
}
|
|
|
|
|
2016-02-19 00:28:02 +00:00
|
|
|
func PathIsAbsolute(s string) bool {
|
|
|
|
if len(s) > 1 && s[1] == ':' {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return path.IsAbs(s)
|
|
|
|
}
|
|
|
|
|
2016-02-15 21:34:23 +00:00
|
|
|
func main() {
|
|
|
|
fmt.Println("Simple ZeroMQ server for goes.")
|
|
|
|
|
2016-02-19 00:28:02 +00:00
|
|
|
flag.Parse()
|
2016-02-15 21:34:23 +00:00
|
|
|
|
2016-02-19 00:28:02 +00:00
|
|
|
storagePath := *db
|
|
|
|
if !PathIsAbsolute(storagePath) {
|
|
|
|
wd, _ := os.Getwd()
|
|
|
|
storagePath = path.Join(wd, storagePath)
|
|
|
|
}
|
|
|
|
|
2016-02-23 21:01:37 +00:00
|
|
|
fmt.Println("Listening on:", *addr)
|
2016-02-19 00:28:02 +00:00
|
|
|
fmt.Println("Storage path:", storagePath)
|
2016-02-18 02:15:40 +00:00
|
|
|
goes.SetStorage(goes.NewReadableDiskStorage(storagePath))
|
|
|
|
goes.SetSerializer(NewSerializer())
|
2016-02-15 21:34:23 +00:00
|
|
|
|
|
|
|
context, err := zmq4.NewContext()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
defer context.Term()
|
|
|
|
|
|
|
|
replySocket, err := context.NewSocket(zmq4.REP)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
defer replySocket.Close()
|
|
|
|
|
2016-02-23 21:01:37 +00:00
|
|
|
err = replySocket.Bind(*addr)
|
2016-02-15 21:34:23 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
message, err := replySocket.RecvMessageBytes(0)
|
|
|
|
if err != nil {
|
|
|
|
fmt.Println("Error receiving command from client", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
command := string(message[0])
|
|
|
|
switch command {
|
|
|
|
case "AddEvent":
|
|
|
|
aggregateId, err := uuid.FromBytes(message[1])
|
|
|
|
if err != nil {
|
|
|
|
fmt.Println("Wrong format for AggregateId", err)
|
|
|
|
break
|
|
|
|
}
|
2016-02-16 02:56:36 +00:00
|
|
|
fmt.Println("->", command, aggregateId.String())
|
2016-02-15 21:34:23 +00:00
|
|
|
data := message[2]
|
|
|
|
err = goes.AddEvent(goes.Event{aggregateId, data})
|
|
|
|
if err != nil {
|
2016-02-17 05:30:50 +00:00
|
|
|
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
|
|
|
|
fmt.Println(err)
|
|
|
|
break
|
2016-02-15 21:34:23 +00:00
|
|
|
}
|
|
|
|
replySocket.Send("Ok", 0)
|
|
|
|
case "ReadStream":
|
|
|
|
aggregateId, err := uuid.FromBytes(message[1])
|
|
|
|
if err != nil {
|
|
|
|
fmt.Println("Wrong format for AggregateId", err)
|
|
|
|
break
|
|
|
|
}
|
2016-02-16 02:56:36 +00:00
|
|
|
fmt.Println("->", command, aggregateId.String())
|
2016-02-15 21:34:23 +00:00
|
|
|
events, err := goes.RetrieveFor(aggregateId)
|
|
|
|
if err != nil {
|
2016-02-17 05:30:50 +00:00
|
|
|
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
|
|
|
|
fmt.Println(err)
|
|
|
|
break
|
2016-02-15 21:34:23 +00:00
|
|
|
}
|
|
|
|
sendEvents(replySocket, events)
|
|
|
|
case "ReadAll":
|
2016-02-16 02:56:36 +00:00
|
|
|
fmt.Println("->", command)
|
2016-02-15 21:34:23 +00:00
|
|
|
events, err := goes.RetrieveAll()
|
|
|
|
if err != nil {
|
2016-02-17 05:30:50 +00:00
|
|
|
replySocket.Send(fmt.Sprintf("Error: %v", err), 0)
|
|
|
|
fmt.Println(err)
|
|
|
|
break
|
2016-02-15 21:34:23 +00:00
|
|
|
}
|
|
|
|
sendEvents(replySocket, events)
|
2016-02-19 00:28:02 +00:00
|
|
|
case "Shutdown":
|
|
|
|
fmt.Println("->", command)
|
|
|
|
return
|
2016-02-15 21:34:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func sendEvents(socket *zmq4.Socket, events []*goes.Event) {
|
|
|
|
len := len(events)
|
2016-02-16 02:56:36 +00:00
|
|
|
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
|
2016-02-15 21:34:23 +00:00
|
|
|
|
|
|
|
i := 0
|
|
|
|
for ; i < len-1; i++ {
|
|
|
|
socket.SendBytes(events[i].Payload.([]byte), zmq4.SNDMORE)
|
|
|
|
}
|
|
|
|
socket.SendBytes(events[i].Payload.([]byte), 0)
|
2016-02-16 02:56:36 +00:00
|
|
|
fmt.Println("<-", len, "events")
|
2016-05-31 03:26:58 +00:00
|
|
|
}
|