diff --git a/src/buffer.c b/src/buffer.c index 9fa02d0..1d4a831 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -3,18 +3,25 @@ // #include +#include #include "buffer.h" -const buffer_t buffer_create(uint32_t size) { +const buffer_t buffer_create(size_t size) { buffer_t buf = {size, malloc(size)}; return buf; } -const buffer_t buffer_from(uint8_t* data, uint32_t size) { +const buffer_t buffer_from(uint8_t* data, size_t size) { buffer_t buf = {size, data}; return buf; } +const buffer_t buffer_copyfrom(uint8_t* data, size_t size) { + buffer_t buf = {size, malloc(size)}; + memcpy(buf.data, data, size); + return buf; +} + void buffer_free(buffer_t buffer) { free(buffer.data); } diff --git a/src/buffer.h b/src/buffer.h index 8efbfb2..02aef7f 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -8,13 +8,14 @@ #include struct st_buffer { - uint32_t size; + size_t size; uint8_t* data; }; typedef struct st_buffer buffer_t; -const buffer_t buffer_create(uint32_t size); -const buffer_t buffer_from(uint8_t* data, uint32_t size); +const buffer_t buffer_create(size_t size); +const buffer_t buffer_from(uint8_t* data, size_t size); +const buffer_t buffer_copyfrom(uint8_t* data, size_t size); void buffer_free(buffer_t buffer); #endif //ESC_BUFFER_H diff --git a/src/esc.c b/src/esc.c index d40c3d8..ae8c237 100644 --- a/src/esc.c +++ b/src/esc.c @@ -11,8 +11,12 @@ #include "proto.h" #include "tcp_package.h" +typedef int bool_t; +const bool_t BOOL_TRUE = 1; +const bool_t BOOL_FALSE = 0; + struct st_connection_settings { - unsigned char use_ssl_connection; + bool_t use_ssl_connection; }; const esc_connection_settings_t default_connection_settings = { @@ -44,7 +48,7 @@ struct st_connection { struct st_static_endpoint_discoverer { tcp_endpoint_t tcp_endpoint; - unsigned char use_ssl_connection; + bool_t use_ssl_connection; }; typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t; @@ -60,9 +64,10 @@ const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* disc int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) { char uuid_buf[37]; - printf("connection_send_tcp_package: %u %u %s %u\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size); + printf("connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size); buffer_t send_buffer = tcp_package_to_buffer(pkg); - if (socket_send(conn->tcp_conn, (char *) &send_buffer.size, sizeof(uint32_t)) <= 0) { + uint32_t size = (uint32_t)send_buffer.size; + if (socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t)) <= 0) { buffer_free(send_buffer); return -1; } @@ -76,21 +81,21 @@ int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_ const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) { uint32_t recv_size; - int rc; + ssize_t rc; if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { - printf("%d %d", rc, socket_error()); + printf("%ld %d", rc, socket_error()); return 0; } buffer_t recv_buffer = buffer_create(recv_size); while(recv_size > 0) { - int pos = recv_buffer.size - recv_size; + size_t pos = recv_buffer.size - recv_size; rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer.data[pos], recv_size); recv_size -= rc; } const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer); buffer_free(recv_buffer); char uuid_buf[37]; - printf("connection_recv_tcp_package: %u %u %s %u\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size); + printf("connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size); //for (int32_t i=0;idata.size;i++) { // printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]); //} @@ -179,7 +184,7 @@ const esc_credentials_t* esc_credentials_create(const char* username, const char return creds; } -const char* string_clone(const char* src) { +const char* string_copy(const char *src) { char* dst = malloc(strlen(src)+1); strcpy(dst, src); return dst; @@ -189,7 +194,13 @@ const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__ if (msg == 0) return 0; esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t)); ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len); - ev->event_type = string_clone(msg->event_type); + ev->event_type = string_copy(msg->event_type); + ev->event_number = msg->event_number; + ev->event_stream_id = string_copy(msg->event_stream_id); + ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0; + ev->data = buffer_copyfrom(msg->data.data, msg->data.len); + ev->metadata = buffer_copyfrom(msg->metadata.data, msg->metadata.len); + return ev; } @@ -226,8 +237,10 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti return 0; } + esc_connection_t* _conn = (esc_connection_t*)conn; + EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = - event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data); + event_store__client__messages__read_all_events_completed__unpack(&_conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data); esc_all_events_slice_t* result = malloc(sizeof(esc_all_events_slice_t)); result->read_direction = "forward"; @@ -241,12 +254,12 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti for (size_t i = 0; i < recv_msg->n_events; i++) { result->events[i] = resolved_event_create(recv_msg->events[i]); } - event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); + event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &_conn->protobuf_c_allocator); return result; } const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { - snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); + snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position); return buffer; } diff --git a/src/esc.h b/src/esc.h index 9f54e42..e8174d7 100644 --- a/src/esc.h +++ b/src/esc.h @@ -6,6 +6,7 @@ #define ESC_ESC_H #include "uuid.h" +#include "buffer.h" struct st_connection_settings; typedef struct st_connection_settings esc_connection_settings_t; @@ -28,6 +29,11 @@ typedef struct st_esc_position esc_position_t; struct st_recorded_event { const esc_uuid_t* event_id; const char* event_type; + int64_t event_number; + const char *event_stream_id; + int64_t created_epoch; + buffer_t data; + buffer_t metadata; }; typedef struct st_recorded_event esc_recorded_event_t; diff --git a/src/main.c b/src/main.c index 4ff946f..6d15780 100644 --- a/src/main.c +++ b/src/main.c @@ -32,8 +32,12 @@ int main() { result->is_end_of_stream); char uuid_buf[37]; for (size_t i = 0; i < result->n_events; i++) { - printf("%s %s\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), - result->events[i]->event->event_type); + printf("%s %s %ld@%s %lu %lu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), + result->events[i]->event->event_type, + result->events[i]->event->event_number, + result->events[i]->event->event_stream_id, + result->events[i]->event->data.size, + result->events[i]->event->metadata.size); } } while(result->is_end_of_stream == 0); diff --git a/src/socket.c b/src/socket.c index 8088f92..d459075 100644 --- a/src/socket.c +++ b/src/socket.c @@ -109,12 +109,12 @@ int socket_connect(socket_t s, char* addr, unsigned short port) { return connect(s->s, result->ai_addr, (int)result->ai_addrlen); } -int socket_send(socket_t s, char* data, int len) { - return (int)send(s->s, data, (size_t)len, 0); +ssize_t socket_send(socket_t s, char* data, size_t len) { + return send(s->s, data, len, 0); } -int socket_recv(socket_t s, char* buf, int len) { - return (int)recv(s->s, buf, (size_t)len, 0); +ssize_t socket_recv(socket_t s, char* buf, size_t len) { + return recv(s->s, buf, len, 0); } int socket_error() { diff --git a/src/socket.h b/src/socket.h index ec468f1..7f0c9f0 100644 --- a/src/socket.h +++ b/src/socket.h @@ -16,8 +16,8 @@ enum { socket_t socket_create(int type); void socket_close(socket_t s); int socket_connect(socket_t s, char* addr, unsigned short port); -int socket_send(socket_t s, char* data, int len); -int socket_recv(socket_t s, char* buf, int len); +ssize_t socket_send(socket_t s, char* data, size_t len); +ssize_t socket_recv(socket_t s, char* buf, size_t len); int socket_error(); #endif //ESC_SOCKET_H