Add ReadAll_v3 with CreationTime
This commit is contained in:
parent
5a7ec3b294
commit
b302a1f78d
|
@ -106,10 +106,10 @@ func Listen(handler actions.Handler) {
|
||||||
}
|
}
|
||||||
if command == "ReadStream_v2" {
|
if command == "ReadStream_v2" {
|
||||||
sendEvents_v2(_replySocket, events)
|
sendEvents_v2(_replySocket, events)
|
||||||
break;
|
break
|
||||||
}
|
}
|
||||||
sendEvents_v1(_replySocket, events)
|
sendEvents_v1(_replySocket, events)
|
||||||
case "ReadAll", "ReadAll_v2":
|
case "ReadAll", "ReadAll_v2", "ReadAll_v3":
|
||||||
fmt.Println("->", command)
|
fmt.Println("->", command)
|
||||||
events, err := handler.RetrieveAll()
|
events, err := handler.RetrieveAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -117,9 +117,13 @@ func Listen(handler actions.Handler) {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
if command == "ReadAll_v3" {
|
||||||
|
sendEvents_v3(_replySocket, events)
|
||||||
|
break
|
||||||
|
}
|
||||||
if command == "ReadAll_v2" {
|
if command == "ReadAll_v2" {
|
||||||
sendEvents_v2(_replySocket, events)
|
sendEvents_v2(_replySocket, events)
|
||||||
break;
|
break
|
||||||
}
|
}
|
||||||
sendEvents_v1(_replySocket, events)
|
sendEvents_v1(_replySocket, events)
|
||||||
case "Shutdown":
|
case "Shutdown":
|
||||||
|
@ -131,7 +135,7 @@ func Listen(handler actions.Handler) {
|
||||||
|
|
||||||
func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) {
|
func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) {
|
||||||
lastFlag := zmq4.SNDMORE
|
lastFlag := zmq4.SNDMORE
|
||||||
if (isLast) {
|
if isLast {
|
||||||
lastFlag = NO_FLAGS
|
lastFlag = NO_FLAGS
|
||||||
}
|
}
|
||||||
socket.SendBytes(event.Payload.([]byte), lastFlag)
|
socket.SendBytes(event.Payload.([]byte), lastFlag)
|
||||||
|
@ -139,7 +143,7 @@ func sendEvent_v1(socket *zmq4.Socket, event *data.Event, isLast bool) {
|
||||||
|
|
||||||
func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) {
|
func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) {
|
||||||
len := len(events)
|
len := len(events)
|
||||||
if (len == 0) {
|
if len == 0 {
|
||||||
socket.Send("0", NO_FLAGS)
|
socket.Send("0", NO_FLAGS)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -156,7 +160,7 @@ func sendEvents_v1(socket *zmq4.Socket, events []*data.Event) {
|
||||||
func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) {
|
func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) {
|
||||||
socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE)
|
socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE)
|
||||||
lastFlag := zmq4.SNDMORE
|
lastFlag := zmq4.SNDMORE
|
||||||
if (isLast) {
|
if isLast {
|
||||||
lastFlag = NO_FLAGS
|
lastFlag = NO_FLAGS
|
||||||
}
|
}
|
||||||
socket.SendBytes(event.Metadata.([]byte), lastFlag)
|
socket.SendBytes(event.Metadata.([]byte), lastFlag)
|
||||||
|
@ -164,7 +168,7 @@ func sendEvent_v2(socket *zmq4.Socket, event *data.Event, isLast bool) {
|
||||||
|
|
||||||
func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) {
|
func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) {
|
||||||
len := len(events)
|
len := len(events)
|
||||||
if (len == 0) {
|
if len == 0 {
|
||||||
socket.Send("0", NO_FLAGS)
|
socket.Send("0", NO_FLAGS)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -177,3 +181,30 @@ func sendEvents_v2(socket *zmq4.Socket, events []*data.Event) {
|
||||||
}
|
}
|
||||||
fmt.Println("<-", len, "events")
|
fmt.Println("<-", len, "events")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sendEvent_v3(socket *zmq4.Socket, event *data.Event, isLast bool) {
|
||||||
|
socket.SendBytes(event.Payload.([]byte), zmq4.SNDMORE)
|
||||||
|
socket.SendBytes(event.Metadata.([]byte), zmq4.SNDMORE)
|
||||||
|
lastFlag := zmq4.SNDMORE
|
||||||
|
if isLast {
|
||||||
|
lastFlag = NO_FLAGS
|
||||||
|
}
|
||||||
|
b, _ := event.CreationTime.MarshalText()
|
||||||
|
socket.SendBytes(b, lastFlag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendEvents_v3(socket *zmq4.Socket, events []*data.Event) {
|
||||||
|
len := len(events)
|
||||||
|
if len == 0 {
|
||||||
|
socket.Send("0", NO_FLAGS)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.Send(fmt.Sprintf("%v", len), zmq4.SNDMORE)
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
for ; i < len; i++ {
|
||||||
|
sendEvent_v3(socket, events[i], i == len - 1)
|
||||||
|
}
|
||||||
|
fmt.Println("<-", len, "events")
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user