goes/storage/simpledisk.go

238 lines
5.6 KiB
Go
Raw Normal View History

2018-03-08 22:59:07 +00:00
package storage
import (
"encoding/binary"
"github.com/satori/go.uuid"
"os"
"time"
"path"
"fmt"
"errors"
)
func NewSimpleDiskStorage(storagePath string) Storage {
return &SimpleDiskStorage{storagePath, path.Join(storagePath, "eventindex")}
}
type SimpleDiskStorage struct {
storagePath string
indexPath string
}
func (me SimpleDiskStorage) getFilename(stream, extension string) string {
return fmt.Sprintf("%v%v", path.Join(me.storagePath, stream[0:2], stream[2:]), extension)
}
func (me SimpleDiskStorage) GetFilenameForEvents(stream string) string {
return me.getFilename(stream, ".history")
}
func writeSizeAndBytes(f *os.File, data []byte) (error) {
sizeBytes := make([]byte, IntegerSizeInBytes)
size := len(data)
binary.BigEndian.PutUint64(sizeBytes, uint64(size))
written, err := f.Write(sizeBytes)
if err != nil {
return err
}
if written != IntegerSizeInBytes {
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", IntegerSizeInBytes, written))
}
written, err = f.Write(data)
if err != nil {
return err
}
if written != size {
return errors.New(fmt.Sprintf("Write error. Expected to write %v bytes, wrote only %v.", size, written))
}
return nil
}
func readSizedBytes(f *os.File) ([]byte, error) {
sizeBytes := make([]byte, IntegerSizeInBytes)
read, err := f.Read(sizeBytes)
if err != nil {
return nil, err
}
if read != IntegerSizeInBytes {
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to read %d bytes, got %d bytes.", IntegerSizeInBytes, read))
}
size := binary.BigEndian.Uint64(sizeBytes)
data := make([]byte, size)
read, err = f.Read(data)
if err != nil {
return nil, err
}
if uint64(read) != size {
return nil, errors.New(fmt.Sprintf("Integrity error. Expected to ready %d bytes, got %d bytes.", IntegerSizeInBytes, read))
}
return data, nil
}
func (me SimpleDiskStorage) Write(event *StoredEvent) error {
filename := me.GetFilenameForEvents(event.StreamId.String())
os.MkdirAll(path.Dir(filename), os.ModeDir)
indexFile, err := os.OpenFile(me.indexPath, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
if err != nil {
return err
}
defer indexFile.Close()
eventsFile, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY | os.O_CREATE, 0644)
if err != nil {
return err
}
defer eventsFile.Close()
stat, err := eventsFile.Stat()
if err != nil {
return err
}
position := stat.Size()
creationTimeBytes, err := event.CreationTime.MarshalBinary()
if err != nil {
return err
}
writeSizeAndBytes(eventsFile, creationTimeBytes)
writeSizeAndBytes(eventsFile, []byte(event.TypeId))
writeSizeAndBytes(eventsFile, event.Data)
indexFile.Write(event.StreamId.Bytes())
positionBytes := make([]byte, IntegerSizeInBytes)
binary.BigEndian.PutUint64(positionBytes, uint64(position))
indexFile.Write(positionBytes)
return nil
}
func (me SimpleDiskStorage) StreamVersion(streamId uuid.UUID) (uint32, error) {
//TODO
return EMPTY_STREAM, nil
}
func (me SimpleDiskStorage) ReadStream(streamId uuid.UUID) ([]*StoredEvent, error) {
streamName := streamId.String()
offset := int64(0) //TODO snapshots
filename := me.GetFilenameForEvents(streamName)
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer eventsFile.Close()
eventsFile.Seek(offset, 0)
results := make([]*StoredEvent, 0)
for {
creationTime, typeId, data, err := getStoredData(eventsFile)
if err != nil && err.Error() == "EOF" {
break
}
if err != nil {
return nil, err
}
//TODO metadata
event := &StoredEvent{streamId, creationTime, typeId, data, "", nil}
results = append(results, event)
}
return results, nil
}
func (me SimpleDiskStorage) ReadAll() ([]*StoredEvent, error) {
indexFile, err := os.OpenFile(me.indexPath, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer indexFile.Close()
results := make([]*StoredEvent, 0)
guidBytes := make([]byte, 16)
offsetBytes := make([]byte, IntegerSizeInBytes)
for {
read, err := indexFile.Read(guidBytes)
if err != nil {
break
}
if read != 16 {
return nil, errors.New("index integrity error")
}
read, err = indexFile.Read(offsetBytes)
if err != nil {
return nil, err
}
if read != IntegerSizeInBytes {
return nil, errors.New("index integrity error")
}
aggregateId, err := uuid.FromBytes(guidBytes)
if err != nil {
return nil, err
}
offset := binary.BigEndian.Uint64(offsetBytes)
storedEvent, err := me.retrieveStoredEvent(aggregateId, int64(offset))
if err != nil {
return nil, err
}
results = append(results, storedEvent)
}
return results, nil
}
func (me SimpleDiskStorage) retrieveStoredEvent(streamId uuid.UUID, offset int64) (*StoredEvent, error) {
filename := me.GetFilenameForEvents(streamId.String())
eventsFile, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer eventsFile.Close()
eventsFile.Seek(offset, 0)
creationTime, typeId, data, err := getStoredData(eventsFile)
if err != nil {
return nil, err
}
//TODO metadata
event := &StoredEvent{streamId, creationTime, typeId, data, "", nil}
return event, nil
}
func getStoredData(eventsFile *os.File) (creationTime time.Time, typeId string, data []byte, err error) {
creationTimeBytes, err := readSizedBytes(eventsFile)
if err != nil {
return
}
err = creationTime.UnmarshalBinary(creationTimeBytes)
if err != nil {
return
}
typeIdBytes, err := readSizedBytes(eventsFile)
if err != nil {
return
}
typeId = string(typeIdBytes)
data, err = readSizedBytes(eventsFile)
return
}
func (me SimpleDiskStorage) RebuildTypeIndexes() {
}
func (me SimpleDiskStorage) RebuildIndexes() {
}