Abstracted storage

This commit is contained in:
Nicolas Dextraze 2016-02-15 10:09:10 -08:00
parent 8f9b33e9ce
commit 0a6a591340
4 changed files with 203 additions and 142 deletions

156
goes.go
View File

@ -2,40 +2,24 @@ package goes
import ( import (
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"os"
"fmt"
"encoding/binary"
"path"
"errors"
) )
var storagePath string
var serializer Serializer var serializer Serializer
var storage Storage
const IntegerSizeInBytes = 8
const StreamStartingCapacity = 512
type Event struct { type Event struct {
AggregateId uuid.UUID AggregateId uuid.UUID
Payload interface{} Payload interface{}
} }
func SetStoragePath(newStoragePath string) { func SetStorage(newStorage Storage) {
storagePath = newStoragePath storage = newStorage
} }
func SetSerializer(newSerializer Serializer) { func SetSerializer(newSerializer Serializer) {
serializer = newSerializer serializer = newSerializer
} }
func getFilename(stream, extension string) string {
return fmt.Sprintf("%v%v", path.Join(storagePath, stream[0:2], stream[2:]), extension)
}
func getFilenameForEvents(stream string) string {
return getFilename(stream, ".history")
}
var mapLock chan int = make(chan int, 1) var mapLock chan int = make(chan int, 1)
var streamsLock map[string]chan int = make(map[string]chan int) var streamsLock map[string]chan int = make(map[string]chan int)
@ -64,157 +48,47 @@ func AddEvent(event Event) error {
lockStream(streamName) lockStream(streamName)
defer unlockStream(streamName) defer unlockStream(streamName)
filename := getFilenameForEvents(streamName)
os.MkdirAll(path.Dir(filename), os.ModeDir)
eventIndexPath := path.Join(storagePath, "eventindex")
indexFile, err := os.OpenFile(eventIndexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0)
if err != nil {
return err
}
defer indexFile.Close()
eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0)
if err != nil {
return err
}
defer eventsFile.Close()
stat, err := eventsFile.Stat()
if err != nil {
return err
}
position := stat.Size()
serializedPayload, err := serializer.Serialize(event.Payload) serializedPayload, err := serializer.Serialize(event.Payload)
if err != nil { if err != nil {
return err return err
} }
lengthBytes := make([]byte, IntegerSizeInBytes) storage.Write(event.AggregateId, serializedPayload)
binary.BigEndian.PutUint64(lengthBytes, uint64(len(serializedPayload)))
eventsFile.Write(lengthBytes)
eventsFile.Write(serializedPayload)
indexFile.Write(event.AggregateId.Bytes())
positionBytes := make([]byte, IntegerSizeInBytes)
binary.BigEndian.PutUint64(positionBytes, uint64(position))
indexFile.Write(positionBytes)
return nil return nil
} }
func RetrieveFor(aggregateId uuid.UUID) ([]Event, error) { func RetrieveFor(aggregateId uuid.UUID) ([]*Event, error) {
streamName := aggregateId.String() results, err := storage.ReadStream(aggregateId)
offset := getStartingIndexFor(streamName)
filename := getFilenameForEvents(streamName)
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer eventsFile.Close()
eventsFile.Seek(offset, 0) events := make([]*Event, 0)
for _, storedEvent := range results {
contentLengthBytes := make([]byte, IntegerSizeInBytes) event, err := serializer.Deserialize(storedEvent.Data)
events := make([]Event, 0)
for {
read, err := eventsFile.Read(contentLengthBytes)
if err != nil {
break
}
if read < 8 {
return nil, errors.New("event index integrity error")
}
event, err := getStoredEvent(eventsFile, contentLengthBytes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
events = append(events, Event{aggregateId, event}) events = append(events, &Event{storedEvent.StreamId, event})
} }
return events, nil return events, nil
} }
func getStartingIndexFor(streamName string) int64 {
//TODO: snapshots
return int64(0)
}
func getStoredEvent(eventsFile *os.File, contentLengthBytes []byte) (interface{}, error) {
contentLength := binary.BigEndian.Uint64(contentLengthBytes)
content := make([]byte, contentLength)
read, err := eventsFile.Read(content)
if err != nil {
return nil, err
}
if uint64(read) < contentLength {
return nil, errors.New("incomplete event information retrieved")
}
return serializer.Deserialize(content)
}
func retrieveEvent(aggregateId uuid.UUID, offset int64) (*Event, error) {
filename := getFilenameForEvents(aggregateId.String())
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer eventsFile.Close()
eventsFile.Seek(offset, 0)
contentLengthBytes := make([]byte, IntegerSizeInBytes)
read, err := eventsFile.Read(contentLengthBytes)
if err != nil {
return nil, err
}
if read < IntegerSizeInBytes {
return nil, errors.New("event integrity problem")
}
content, err := getStoredEvent(eventsFile, contentLengthBytes)
if err != nil {
return nil, err
}
return &Event{aggregateId, content}, nil
}
func RetrieveAll() ([]*Event, error) { func RetrieveAll() ([]*Event, error) {
indexFile, err := os.OpenFile(path.Join(storagePath, "eventindex"), os.O_RDONLY, 0) results, err := storage.ReadAll()
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer indexFile.Close()
events := make([]*Event, 0) events := make([]*Event, 0)
guidBytes := make([]byte, 16) for _, storedEvent := range results {
offsetBytes := make([]byte, IntegerSizeInBytes) event, err := serializer.Deserialize(storedEvent.Data)
for {
read, err := indexFile.Read(guidBytes)
if err != nil {
break
}
if read != 16 {
return nil, errors.New("index integrity error")
}
read, err = indexFile.Read(offsetBytes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if read != IntegerSizeInBytes { events = append(events, &Event{storedEvent.StreamId, event})
return nil, errors.New("index integrity error")
}
aggregateId, err := uuid.FromBytes(guidBytes)
if err != nil {
return nil, err
}
offset := binary.BigEndian.Uint64(offsetBytes)
event, err := retrieveEvent(aggregateId, int64(offset))
if err != nil {
return nil, err
}
events = append(events, event)
} }
return events, nil return events, nil

View File

@ -27,7 +27,8 @@ type AnotherEvent struct {
func setUp() { func setUp() {
tempDir := path.Join(os.TempDir(), uuid.NewV4().String()) tempDir := path.Join(os.TempDir(), uuid.NewV4().String())
SetStoragePath(tempDir) storage := NewDiskStorage(tempDir)
SetStorage(storage)
serializer := NewJsonSerializer((*MyEvent)(nil), (*AnotherEvent)(nil)) serializer := NewJsonSerializer((*MyEvent)(nil), (*AnotherEvent)(nil))
SetSerializer(serializer) SetSerializer(serializer)
} }

View File

@ -1,4 +1,5 @@
package goes package goes
import ( import (
"reflect" "reflect"
"encoding/json" "encoding/json"
@ -11,6 +12,7 @@ type Serializer interface {
Deserialize([]byte) (interface{}, error) Deserialize([]byte) (interface{}, error)
} }
//TODO: any serializer will require a type registry maybe this should be abstracted
type JsonSerializer struct { type JsonSerializer struct {
types map[string]reflect.Type types map[string]reflect.Type
} }

184
storage.go Normal file
View File

@ -0,0 +1,184 @@
package goes
import (
"os"
"path"
"encoding/binary"
"github.com/satori/go.uuid"
"fmt"
"errors"
)
const IntegerSizeInBytes = 8
const StreamStartingCapacity = 512
type StoredEvent struct {
StreamId uuid.UUID
Data []byte
}
//TODO: performance - change reads array for some kind of iterator
type Storage interface {
Write(streamId uuid.UUID, data []byte) error
ReadStream(streamId uuid.UUID) ([]*StoredEvent, error)
ReadAll() ([]*StoredEvent, error)
}
func NewDiskStorage(storagePath string) Storage {
return &DiskStorage{storagePath, path.Join(storagePath, "eventindex")}
}
type DiskStorage struct {
storagePath string
indexPath string
}
func (me DiskStorage) getFilename(stream, extension string) string {
return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension)
}
func (me DiskStorage) getFilenameForEvents(stream string) string {
return me.getFilename(stream, ".history")
}
func (me DiskStorage) Write(streamId uuid.UUID, data []byte) error {
filename := me.getFilenameForEvents(streamId.String())
os.MkdirAll(path.Dir(filename), os.ModeDir)
indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0)
if err != nil {
return err
}
defer indexFile.Close()
eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0)
if err != nil {
return err
}
defer eventsFile.Close()
stat, err := eventsFile.Stat()
if err != nil {
return err
}
position := stat.Size()
lengthBytes := make([]byte, IntegerSizeInBytes)
binary.BigEndian.PutUint64(lengthBytes, uint64(len(data)))
eventsFile.Write(lengthBytes)
eventsFile.Write(data)
indexFile.Write(streamId.Bytes())
positionBytes := make([]byte, IntegerSizeInBytes)
binary.BigEndian.PutUint64(positionBytes, uint64(position))
indexFile.Write(positionBytes)
return nil
}
func (me DiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
streamName := streamId.String()
offset := int64(0) //TODO snapshots
filename := me.getFilenameForEvents(streamName)
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer eventsFile.Close()
eventsFile.Seek(offset, 0)
contentLengthBytes := make([]byte, IntegerSizeInBytes)
results := make([]*StoredEvent, 0)
for {
read, err := eventsFile.Read(contentLengthBytes)
if err != nil {
break
}
if read != IntegerSizeInBytes {
return nil, errors.New("event index integrity error")
}
data, err := getStoredData(eventsFile, contentLengthBytes)
if err != nil {
return nil, err
}
results = append(results, &StoredEvent{streamId, data})
}
return results, nil
}
func (me DiskStorage) ReadAll() ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer indexFile.Close()
results := make([]*StoredEvent, 0)
guidBytes := make([]byte, 16)
offsetBytes := make([]byte, IntegerSizeInBytes)
for {
read, err := indexFile.Read(guidBytes)
if err != nil {
break
}
if read != 16 {
return nil, errors.New("index integrity error")
}
read, err = indexFile.Read(offsetBytes)
if err != nil {
return nil, err
}
if read != IntegerSizeInBytes {
return nil, errors.New("index integrity error")
}
aggregateId, err := uuid.FromBytes(guidBytes)
if err != nil {
return nil, err
}
offset := binary.BigEndian.Uint64(offsetBytes)
data, err := me.retrieveData(aggregateId, int64(offset))
if err != nil {
return nil, err
}
results = append(results, &StoredEvent{aggregateId, data})
}
return results, nil
}
func (me DiskStorage) retrieveData(aggregateId uuid.UUID, offset int64) ([]byte, error) {
filename := me.getFilenameForEvents(aggregateId.String())
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer eventsFile.Close()
eventsFile.Seek(offset, 0)
contentLengthBytes := make([]byte, IntegerSizeInBytes)
read, err := eventsFile.Read(contentLengthBytes)
if err != nil {
return nil, err
}
if read < IntegerSizeInBytes {
return nil, errors.New("event integrity problem")
}
return getStoredData(eventsFile, contentLengthBytes)
}
func getStoredData(eventsFile *os.File, contentLengthBytes []byte) ([]byte, error) {
contentLength := binary.BigEndian.Uint64(contentLengthBytes)
data := make([]byte, contentLength)
read, err := eventsFile.Read(data)
if err != nil {
return nil, err
}
if uint64(read) < contentLength {
return nil, errors.New("incomplete event information retrieved")
}
return data, nil
}