goes/server/zeromq.go
2017-02-11 22:22:06 -08:00

210 lines
5.2 KiB
Go

package server
import (
"fmt"
data "../data"
actions "../actions"
"github.com/satori/go.uuid"
"github.com/pebbe/zmq4"
"encoding/binary"
)
var _context *zmq4.Context
var _replySocket *zmq4.Socket
var _addr string
func Bind(addr string) {
var err error;
_addr = addr
_context, err = zmq4.NewContext()
if err != nil {
panic(err)
}
_replySocket, err = _context.NewSocket(zmq4.REP)
if err != nil {
panic(err)
}
err = _replySocket.Bind(addr)
if err != nil {
panic(err)
}
}
func Destroy() {
_replySocket.Close()
_context.Term()
}
const NO_FLAGS = zmq4.Flag(0)
const UUID_SIZE = 16
const COMMAND_FRAME = 0
const ARGS_FRAME = 1
const PAYLOAD_FRAME = 2
const METADATA_FRAME = 3
func Listen(handler actions.Handler) {
fmt.Println("Listening for incoming commands on:", _addr)
for {
message, err := _replySocket.RecvMessageBytes(NO_FLAGS)
if err != nil {
fmt.Println("Error receiving command from client", err)
continue
}
command := string(message[COMMAND_FRAME])
switch command {
case "AddEvent":
// v1 - "AddEvent" [AggregateId] {payload}
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
payload := message[PAYLOAD_FRAME]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: nil}, actions.NO_EXPECTEDVERSION)
if err != nil {
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
_replySocket.Send("Ok", NO_FLAGS)
case "AddEvent_v2":
// v2 - "AddEvent" 16:AggregateId,4:expectedVersion {payload} {metadata}
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME][0:UUID_SIZE])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
expectedVersion := binary.LittleEndian.Uint32(message[ARGS_FRAME][UUID_SIZE:])
fmt.Println("->", command, aggregateId.String(), expectedVersion)
payload := message[PAYLOAD_FRAME]
metadata := message[METADATA_FRAME]
err = handler.AddEvent(data.Event{AggregateId: aggregateId, Payload: payload, Metadata: metadata}, expectedVersion)
if err != nil {
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
_replySocket.Send("Ok", NO_FLAGS)
case "ReadStream", "ReadStream_v2":
aggregateId, err := uuid.FromBytes(message[ARGS_FRAME])
if err != nil {
fmt.Println("Wrong format for AggregateId", err)
break
}
fmt.Println("->", command, aggregateId.String())
events, err := handler.RetrieveFor(aggregateId)
if err != nil {
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
if command == "ReadStream_v2" {
sendEvents_v2(_replySocket, events)
break
}
sendEvents_v1(_replySocket, events)
case "ReadAll", "ReadAll_v2", "ReadAll_v3":
fmt.Println("->", command)
events, err := handler.RetrieveAll()
if err != nil {
_replySocket.Send(fmt.Sprintf("Error: %v", err), NO_FLAGS)
fmt.Println(err)
break
}
if command == "ReadAll_v3" {
sendEvents_v3(_replySocket, events)
break
}
if command == "ReadAll_v2" {
sendEvents_v2(_replySocket, events)
break
}
sendEvents_v1(_replySocket, events)
case "Shutdown":
fmt.Println("->", command)
return
}
}
}
func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) {
lastFlag := zmq4.SNDMORE
if isLast {
lastFlag = NO_FLAGS
}
socket.SendBytes(event.Payload.([]byte), lastFlag)
}
func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) {
len := len(events)
if len == 0 {
socket.Send("0", NO_FLAGS)
return
}
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
i := 0
for ; i < len; i++ {
sendEvent_v1(socket, events[i], i == len - 1)
}
fmt.Println("<-", len, "events")
}
func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) {
socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE)
lastFlag := zmq4.SNDMORE
if isLast {
lastFlag = NO_FLAGS
}
socket.SendBytes(event.Metadata.([]byte), lastFlag)
}
func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) {
len := len(events)
if len == 0 {
socket.Send("0", NO_FLAGS)
return
}
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
i := 0
for ; i < len; i++ {
sendEvent_v2(socket, events[i], i == len - 1)
}
fmt.Println("<-", len, "events")
}
func sendEvent_v3(socket *zmq4.Socket, event *data.Event, isLast bool) {
socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE)
socket.SendBytes(event.Metadata.([]byte), zmq4.SNDMORE)
lastFlag := zmq4.SNDMORE
if isLast {
lastFlag = NO_FLAGS
}
b, _ := event.CreationTime.MarshalText()
socket.SendBytes(b, lastFlag)
}
func sendEvents_v3(socket *zmq4.Socket, events []*data.Event) {
len := len(events)
if len == 0 {
socket.Send("0", NO_FLAGS)
return
}
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
i := 0
for ; i < len; i++ {
sendEvent_v3(socket, events[i], i == len - 1)
}
fmt.Println("<-", len, "events")
}