diff --git a/server/zeromq.go b/server/zeromq.go index 156b94b..5a0243d 100644 --- a/server/zeromq.go +++ b/server/zeromq.go @@ -106,10 +106,10 @@ func Listen(handler actions.Handler) { } if command == "ReadStream_v2" { sendEvents_v2(_replySocket, events) - break; + break } sendEvents_v1(_replySocket, events) - case "ReadAll", "ReadAll_v2": + case "ReadAll", "ReadAll_v2", "ReadAll_v3": fmt.Println("->", command) events, err := handler.RetrieveAll() if err != nil { @@ -117,9 +117,13 @@ func Listen(handler actions.Handler) { fmt.Println(err) break } + if command == "ReadAll_v3" { + sendEvents_v3(_replySocket, events) + break + } if command == "ReadAll_v2" { sendEvents_v2(_replySocket, events) - break; + break } sendEvents_v1(_replySocket, events) case "Shutdown": @@ -131,7 +135,7 @@ func Listen(handler actions.Handler) { func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) { lastFlag := zmq4.SNDMORE - if (isLast) { + if isLast { lastFlag = NO_FLAGS } socket.SendBytes(event.Payload.([]byte), lastFlag) @@ -139,7 +143,7 @@ func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) { func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) { len := len(events) - if (len == 0) { + if len == 0 { socket.Send("0", NO_FLAGS) return } @@ -156,7 +160,7 @@ func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) { func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) { socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE) lastFlag := zmq4.SNDMORE - if (isLast) { + if isLast { lastFlag = NO_FLAGS } socket.SendBytes(event.Metadata.([]byte), lastFlag) @@ -164,7 +168,7 @@ func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) { func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) { len := len(events) - if (len == 0) { + if len == 0 { socket.Send("0", NO_FLAGS) return } @@ -176,4 +180,31 @@ func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) { sendEvent_v2(socket, events[i], i == len - 1) } fmt.Println("<-", len, "events") +} + +func sendEvent_v3(socket *zmq4.Socket, event *data.Event, isLast bool) { + socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE) + socket.SendBytes(event.Metadata.([]byte), zmq4.SNDMORE) + lastFlag := zmq4.SNDMORE + if isLast { + lastFlag = NO_FLAGS + } + b, _ := event.CreationTime.MarshalText() + socket.SendBytes(b, lastFlag) +} + +func sendEvents_v3(socket *zmq4.Socket, events []*data.Event) { + len := len(events) + if len == 0 { + socket.Send("0", NO_FLAGS) + return + } + + socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE) + + i := 0 + for ; i < len; i++ { + sendEvent_v3(socket, events[i], i == len - 1) + } + fmt.Println("<-", len, "events") } \ No newline at end of file