2016-02-13 01:54:50 +00:00
|
|
|
package goes
|
|
|
|
|
|
|
|
import (
|
|
|
|
"github.com/satori/go.uuid"
|
2016-02-18 02:15:40 +00:00
|
|
|
"time"
|
2016-02-13 01:54:50 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var serializer Serializer
|
2016-02-15 18:09:10 +00:00
|
|
|
var storage Storage
|
2016-02-13 01:54:50 +00:00
|
|
|
|
|
|
|
type Event struct {
|
|
|
|
AggregateId uuid.UUID
|
|
|
|
Payload interface{}
|
|
|
|
}
|
|
|
|
|
2016-02-15 18:09:10 +00:00
|
|
|
func SetStorage(newStorage Storage) {
|
|
|
|
storage = newStorage
|
2016-02-13 01:54:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func SetSerializer(newSerializer Serializer) {
|
|
|
|
serializer = newSerializer
|
|
|
|
}
|
|
|
|
|
|
|
|
var mapLock chan int = make(chan int, 1)
|
|
|
|
var streamsLock map[string]chan int = make(map[string]chan int)
|
|
|
|
|
|
|
|
func lockStream(streamName string) {
|
|
|
|
mapLock <- 1
|
|
|
|
defer func(){
|
|
|
|
<-mapLock
|
|
|
|
}()
|
|
|
|
|
|
|
|
streamLock := streamsLock[streamName]
|
|
|
|
if streamLock == nil {
|
|
|
|
streamLock = make(chan int, 1)
|
|
|
|
streamsLock[streamName] = streamLock
|
|
|
|
}
|
|
|
|
|
|
|
|
streamLock <- 1
|
|
|
|
}
|
|
|
|
|
|
|
|
func unlockStream(streamName string) {
|
|
|
|
<-streamsLock[streamName]
|
|
|
|
}
|
|
|
|
|
|
|
|
func AddEvent(event Event) error {
|
|
|
|
streamName := event.AggregateId.String()
|
|
|
|
|
|
|
|
lockStream(streamName)
|
|
|
|
defer unlockStream(streamName)
|
|
|
|
|
2016-02-18 02:15:40 +00:00
|
|
|
serializedPayload, typeId, err := serializer.Serialize(event.Payload)
|
2016-02-13 01:54:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-02-18 02:15:40 +00:00
|
|
|
return storage.Write(&StoredEvent{event.AggregateId, time.Now(), typeId, serializedPayload})
|
2016-02-13 01:54:50 +00:00
|
|
|
}
|
|
|
|
|
2016-02-15 18:09:10 +00:00
|
|
|
func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) {
|
|
|
|
results, err := storage.ReadStream(aggregateId)
|
2016-02-13 01:54:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-02-15 18:09:10 +00:00
|
|
|
events := make([]*Event, 0)
|
|
|
|
for _, storedEvent := range results {
|
2016-02-18 02:15:40 +00:00
|
|
|
event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId)
|
2016-02-13 01:54:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-02-15 18:09:10 +00:00
|
|
|
events = append(events, &Event{storedEvent.StreamId, event})
|
2016-02-13 01:54:50 +00:00
|
|
|
}
|
|
|
|
|
2016-02-15 18:09:10 +00:00
|
|
|
return events, nil
|
2016-02-13 01:54:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func RetrieveAll() ([]*Event, error) {
|
2016-02-15 18:09:10 +00:00
|
|
|
results, err := storage.ReadAll()
|
2016-02-13 01:54:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
events := make([]*Event, 0)
|
2016-02-15 18:09:10 +00:00
|
|
|
for _, storedEvent := range results {
|
2016-02-18 02:15:40 +00:00
|
|
|
event, err := serializer.Deserialize(storedEvent.Data, storedEvent.TypeId)
|
2016-02-13 01:54:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-02-15 18:09:10 +00:00
|
|
|
events = append(events, &Event{storedEvent.StreamId, event})
|
2016-02-13 01:54:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return events, nil
|
|
|
|
}
|