214 lines
11 KiB
C
214 lines
11 KiB
C
//
|
|
// Created by nicolas on 22/03/18.
|
|
//
|
|
|
|
#include "utils/debug.h"
|
|
#include "utils/string.h"
|
|
#include "results.h"
|
|
#include "proto_helper.h"
|
|
#include "proto.h"
|
|
#include "utils/bool.h"
|
|
#include "event_data.h"
|
|
|
|
void* protobuf_c_alloc(void *alloc_data, size_t size) {
|
|
return malloc(size);
|
|
}
|
|
|
|
void protobuf_c_free(void *alloc_data, void* p) {
|
|
free(p);
|
|
}
|
|
|
|
ProtobufCAllocator protobuf_allocator = {
|
|
protobuf_c_alloc, protobuf_c_free, 0
|
|
};
|
|
|
|
inspection_result_t* inspection_result_create(operation_decision_t decision, const char* description, error_t* error) {
|
|
inspection_result_t* inspection_result = malloc(sizeof(inspection_result_t));
|
|
inspection_result->decision = decision;
|
|
inspection_result->description = description;
|
|
inspection_result->error = error;
|
|
return inspection_result;
|
|
}
|
|
|
|
void inspection_result_destroy(inspection_result_t* inspection_result) {
|
|
free(inspection_result->error);
|
|
free(inspection_result);
|
|
}
|
|
|
|
buffer_t* esc_identify_client_pack(const char* connection_name) {
|
|
EventStore__Client__Messages__IdentifyClient identify_client;
|
|
event_store__client__messages__identify_client__init(&identify_client);
|
|
identify_client.connection_name = (char*)connection_name;
|
|
identify_client.version = 1;
|
|
size_t msg_size = event_store__client__messages__identify_client__get_packed_size(&identify_client);
|
|
uint8_t msg_buf[msg_size];
|
|
event_store__client__messages__identify_client__pack(&identify_client, msg_buf);
|
|
return buffer_copyfrom(msg_buf, msg_size);
|
|
}
|
|
|
|
esc_recorded_event_t* esc_recorded_event_unpack(EventStore__Client__Messages__EventRecord* msg) {
|
|
if (msg == 0) return 0;
|
|
esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event));
|
|
ev->event_id = uuid_from(msg->event_id.data, msg->event_id.len);
|
|
ev->event_type = string_copy(msg->event_type);
|
|
ev->event_number = msg->event_number;
|
|
ev->event_stream_id = string_copy(msg->event_stream_id);
|
|
ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0;
|
|
ev->data = buffer_copyfrom(msg->data.data, msg->data.len);
|
|
ev->metadata = buffer_copyfrom(msg->metadata.data, msg->metadata.len);
|
|
//event_store__client__messages__event_record__free_unpacked(msg, &protobuf_allocator);
|
|
return ev;
|
|
}
|
|
|
|
esc_resolved_event_t* esc_resolved_event_unpack(EventStore__Client__Messages__ResolvedEvent* msg) {
|
|
esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event));
|
|
ev->original_position = esc_position_create(msg->prepare_position, msg->commit_position);
|
|
ev->event = esc_recorded_event_unpack(msg->event);
|
|
ev->link = esc_recorded_event_unpack(msg->link);
|
|
//event_store__client__messages__resolved_event__free_unpacked(msg, &protobuf_allocator);
|
|
return ev;
|
|
}
|
|
|
|
const char* get_string_for_all_events_result(int result) {
|
|
switch(result) {
|
|
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Success: return "Success";
|
|
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__NotModified: return "Not Modified";
|
|
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__AccessDenied: return "Access Denied";
|
|
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Error: return "Error";
|
|
default: return "Unknown";
|
|
}
|
|
}
|
|
|
|
inspection_result_t* esc_all_events_slice_unpack(buffer_t* buffer, esc_all_events_slice_t** all_events_slice_p) {
|
|
EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg =
|
|
event_store__client__messages__read_all_events_completed__unpack(
|
|
&protobuf_allocator,
|
|
buffer_size(buffer),
|
|
buffer_data(buffer));
|
|
|
|
inspection_result_t* inspection_result;
|
|
switch (recv_msg->result) {
|
|
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Success: {
|
|
esc_all_events_slice_t *all_events_slice = malloc(sizeof(struct st_all_events_slice));
|
|
all_events_slice->read_direction = "forward";
|
|
all_events_slice->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0;
|
|
all_events_slice->from_position = esc_position_create(recv_msg->prepare_position,
|
|
recv_msg->commit_position);
|
|
all_events_slice->next_position = esc_position_create(recv_msg->next_prepare_position,
|
|
recv_msg->next_commit_position);
|
|
all_events_slice->n_events = recv_msg->n_events;
|
|
all_events_slice->events = malloc(recv_msg->n_events * sizeof(void *));
|
|
for (size_t i = 0; i < recv_msg->n_events; i++) {
|
|
all_events_slice->events[i] = esc_resolved_event_unpack(recv_msg->events[i]);
|
|
}
|
|
*all_events_slice_p = all_events_slice;
|
|
inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Success", 0);
|
|
break;
|
|
}
|
|
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Error: {
|
|
error_t* error = error_create(recv_msg->result, "Server error: %s", recv_msg->error);
|
|
inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Error", error);
|
|
break;
|
|
}
|
|
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__AccessDenied: {
|
|
error_t* error = error_create(recv_msg->result, "Access denied: %s %s", "Read", "All");
|
|
inspection_result = inspection_result_create(Operation_Decision_EndOperation, "AccessDenied", error);
|
|
break;
|
|
}
|
|
default: {
|
|
error_t* error = error_create(recv_msg->result, "Unexpected ReadStreamResult %s", get_string_for_all_events_result(recv_msg->result));
|
|
inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Unexpected", error);
|
|
break;
|
|
}
|
|
}
|
|
event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &protobuf_allocator);
|
|
return inspection_result;
|
|
}
|
|
|
|
buffer_t* esc_read_all_forward_pack(esc_position_t* last_checkpoint, int32_t count, bool_t resolve_link_tos) {
|
|
EventStore__Client__Messages__ReadAllEvents send_msg;
|
|
event_store__client__messages__read_all_events__init(&send_msg);
|
|
send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0;
|
|
send_msg.commit_position = last_checkpoint ? last_checkpoint->commit_position : 0;
|
|
send_msg.max_count = count;
|
|
send_msg.require_master = 0;
|
|
send_msg.resolve_link_tos = resolve_link_tos;
|
|
size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg);
|
|
uint8_t buffer[s];
|
|
event_store__client__messages__read_all_events__pack(&send_msg, buffer);
|
|
return buffer_copyfrom(buffer, s);
|
|
}
|
|
|
|
const char* get_string_for_write_result(int write_result) {
|
|
switch(write_result) {
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__Success: return "Success";
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__PrepareTimeout: return "PrepareTimeout";
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__CommitTimeout: return "CommitTimeout";
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__ForwardTimeout: return "ForwardTimeout";
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__WrongExpectedVersion: return "WrongExpectedVersion";
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__StreamDeleted: return "StreamDeleted";
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__InvalidTransaction: return "InvalidTransaction";
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__AccessDenied: return "AccessDenied";
|
|
default: return "Unknown";
|
|
}
|
|
}
|
|
|
|
inspection_result_t* esc_write_result_unpack(buffer_t* buffer, esc_write_result_t** write_result_p) {
|
|
EventStore__Client__Messages__WriteEventsCompleted* recv_msg =
|
|
event_store__client__messages__write_events_completed__unpack(&protobuf_allocator, buffer_size(buffer), buffer_data(buffer));
|
|
inspection_result_t* inspection_result;
|
|
switch(recv_msg->result) {
|
|
case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__Success: {
|
|
esc_write_result_t *write_result = malloc(sizeof(esc_write_result_t));
|
|
write_result->next_expected_version = recv_msg->last_event_number;
|
|
write_result->log_position = esc_position_create(
|
|
recv_msg->prepare_position ? recv_msg->prepare_position : -1,
|
|
recv_msg->commit_position ? recv_msg->commit_position : -1);
|
|
*write_result_p = write_result;
|
|
inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Success", 0);
|
|
break;
|
|
}
|
|
default: {
|
|
error_t* error = error_create(recv_msg->result, "Unexpected OperationResult %s", get_string_for_write_result(recv_msg->result));
|
|
inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Unexpected", error);
|
|
break;
|
|
}
|
|
}
|
|
event_store__client__messages__write_events_completed__free_unpacked(recv_msg, &protobuf_allocator);
|
|
return inspection_result;
|
|
}
|
|
|
|
buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version, array_t* events) {
|
|
EventStore__Client__Messages__WriteEvents send_msg;
|
|
event_store__client__messages__write_events__init(&send_msg);
|
|
send_msg.event_stream_id = (char*)stream;
|
|
send_msg.expected_version = expected_version;
|
|
send_msg.require_master = 0;
|
|
send_msg.n_events = events->size;
|
|
send_msg.events = malloc(events->size * sizeof(EventStore__Client__Messages__NewEvent*));
|
|
for (size_t i = 0; i < events->size; i++) {
|
|
esc_event_data_t* event_data = events->data[i];
|
|
EventStore__Client__Messages__NewEvent* new_event = malloc(sizeof(EventStore__Client__Messages__NewEvent));
|
|
event_store__client__messages__new_event__init(new_event);
|
|
new_event->event_id.data = (uint8_t *)event_data->event_id;
|
|
new_event->event_id.len = 16;
|
|
new_event->metadata.data = event_data->metadata ? buffer_data(event_data->metadata) : 0;
|
|
new_event->metadata.len = event_data->metadata ? buffer_size(event_data->metadata) : 0;
|
|
new_event->data.data = event_data->data ? buffer_data(event_data->data) : 0;
|
|
new_event->data.len = event_data->data ? buffer_size(event_data->data) : 0;
|
|
new_event->event_type = (char*)event_data->event_type;
|
|
new_event->data_content_type = event_data->is_json ? 1 : 0;
|
|
new_event->metadata_content_type = event_data->is_json ? 1 : 0;
|
|
new_event->has_metadata = event_data->metadata ? 1 : 0;
|
|
send_msg.events[i] = new_event;
|
|
}
|
|
size_t size = event_store__client__messages__write_events__get_packed_size(&send_msg);
|
|
uint8_t buffer[size];
|
|
event_store__client__messages__write_events__pack(&send_msg, buffer);
|
|
for (size_t i = 0; i < events->size; i++) {
|
|
free(send_msg.events[i]);
|
|
}
|
|
free(send_msg.events);
|
|
return buffer_copyfrom(buffer, size);
|
|
}
|