From aa0db4f07a005b701ea429a81cd6daf527a7611b Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Sat, 24 Mar 2018 09:30:29 -0700 Subject: [PATCH] Moved main to test Moved esc.h to include Added append_to_stream on connection --- CMakeLists.txt | 2 +- include/esc.h | 112 +++++++++++ src/connection.c | 322 ++++++++++++++++++++++++++++++ src/connection.h | 41 ++++ src/credentials.c | 21 ++ src/credentials.h | 17 ++ src/endpoint_discoverer.c | 16 ++ src/endpoint_discoverer.h | 30 +++ src/esc.c | 410 -------------------------------------- src/esc.h | 80 -------- src/event_data.c | 25 +++ src/event_data.h | 23 +++ src/main.c | 68 ------- src/position.c | 37 ++++ src/position.h | 20 ++ src/proto_helper.c | 213 ++++++++++++++++++++ src/proto_helper.h | 39 ++++ src/results.c | 39 ++++ src/results.h | 50 +++++ src/tcp_package.c | 28 +-- src/tcp_package.h | 16 +- src/utils/array.c | 33 +++ src/utils/array.h | 20 ++ src/utils/bool.h | 13 ++ src/utils/buffer.c | 28 ++- src/utils/buffer.h | 16 +- src/utils/debug.c | 21 +- src/utils/error.c | 29 +++ src/utils/error.h | 21 ++ src/utils/event.c | 37 ++++ src/utils/event.h | 15 ++ src/utils/mutex.c | 21 +- src/utils/mutex.h | 10 +- src/utils/queue.c | 43 +++- src/utils/queue.h | 19 +- src/utils/socket.c | 41 ++-- src/utils/socket.h | 20 +- src/utils/thread.c | 26 +-- src/utils/thread.h | 14 +- src/utils/uuid.c | 18 +- src/utils/uuid.h | 17 +- test/main.c | 89 +++++++++ 42 files changed, 1461 insertions(+), 699 deletions(-) create mode 100644 include/esc.h create mode 100644 src/connection.c create mode 100644 src/connection.h create mode 100644 src/credentials.c create mode 100644 src/credentials.h create mode 100644 src/endpoint_discoverer.c create mode 100644 src/endpoint_discoverer.h delete mode 100644 src/esc.c delete mode 100644 src/esc.h create mode 100644 src/event_data.c create mode 100644 src/event_data.h delete mode 100644 src/main.c create mode 100644 src/position.c create mode 100644 src/position.h create mode 100644 src/proto_helper.c create mode 100644 src/proto_helper.h create mode 100644 src/results.c create mode 100644 src/results.h create mode 100644 src/utils/array.c create mode 100644 src/utils/array.h create mode 100644 src/utils/bool.h create mode 100644 src/utils/error.c create mode 100644 src/utils/error.h create mode 100644 src/utils/event.c create mode 100644 src/utils/event.h create mode 100644 test/main.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 566fa0f..16b75b0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ if(WIN32) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) endif() -add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c) +add_executable(esc test/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h include/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c src/position.c src/position.h src/credentials.c src/credentials.h src/connection.c src/connection.h src/utils/bool.h src/endpoint_discoverer.h src/utils/error.c src/utils/error.h src/endpoint_discoverer.c src/proto_helper.h src/proto_helper.c src/results.h src/results.c src/event_data.c src/event_data.h src/utils/array.c src/utils/array.h src/utils/event.c src/utils/event.h) if(WIN32) target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a) diff --git a/include/esc.h b/include/esc.h new file mode 100644 index 0000000..df8db32 --- /dev/null +++ b/include/esc.h @@ -0,0 +1,112 @@ +// +// Created by nicol on 2018-03-18. +// + +#ifndef ESC_ESC_H +#define ESC_ESC_H + +#include + +// bool +typedef int bool_t; +#define BOOL_TRUE 1 +#define BOOL_FALSE 0 + +// array +typedef struct st_array array_t; +typedef void (*array_deallocator)(void*); +array_t* array_create(size_t n, ...); +void array_destroy(array_t* array, array_deallocator destroyer); + +// uuid +typedef struct st_uuid uuid_t; +uuid_t* uuid_create(); +void uuid_destroy(uuid_t*); + +// buffer +typedef struct st_buffer buffer_t; +void buffer_destroy(buffer_t*); +buffer_t* buffer_from_string(const char* str); +size_t buffer_size(buffer_t* b); + +// error +#define ERROR_MSG_SIZE 1024 +typedef struct st_error { + const char* file; + int line; + int code; + char message[ERROR_MSG_SIZE]; +} error_t; + +// esc +#define ESC_VERSION_NOSTREAM -1 +#define ESC_VERSION_EMPTYSTREAM -1 +#define ESC_VERSION_ANY -2 + +typedef struct st_esc_position esc_position_t; +typedef struct st_esc_connection_settings esc_connection_settings_t; +typedef struct st_connection esc_connection_t; +typedef struct st_credentials esc_credentials_t; + +struct st_recorded_event { + uuid_t* event_id; + const char* event_type; + int64_t event_number; + const char *event_stream_id; + int64_t created_epoch; + buffer_t* data; + buffer_t* metadata; +}; +typedef struct st_recorded_event esc_recorded_event_t; + +struct st_resolved_event { + esc_recorded_event_t* event; + esc_recorded_event_t* link; + esc_position_t* original_position; +}; +typedef struct st_resolved_event esc_resolved_event_t; + +struct st_all_events_slice { + char* read_direction; + esc_position_t* from_position; + esc_position_t* next_position; + size_t n_events; + esc_resolved_event_t** events; + int is_end_of_stream; +}; +typedef struct st_all_events_slice esc_all_events_slice_t; + +typedef struct st_esc_write_result { + int64_t next_expected_version; + esc_position_t* log_position; +} esc_write_result_t; + +typedef struct st_esc_event_data esc_event_data_t; + +esc_connection_settings_t* const esc_default_connection_settings; + +// Connection +esc_connection_t* esc_connection_create(esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name); +void esc_connection_destroy(esc_connection_t* conn); + +int esc_connection_connect(esc_connection_t* conn); +void esc_connection_close(esc_connection_t* conn); + +esc_credentials_t* esc_credentials_create(const char* username, const char* password); +void esc_credentials_destroy(esc_credentials_t* creds); + +esc_event_data_t* esc_event_data_create(uuid_t* event_id, const char* event_type, bool_t is_json, buffer_t* data, buffer_t* metadata); +void esc_event_data_destroy(esc_event_data_t*); +esc_write_result_t* esc_append_to_stream(esc_connection_t* conn, const char* stream, int64_t expected_version, array_t* events); +void esc_write_result_destroy(esc_write_result_t* write_result); + +esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t* conn, esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t* credentials); +void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice); + +error_t* esc_connection_last_error(esc_connection_t* conn); + +// Formatting +const char* uuid_format(uuid_t* uuid, char* buf, size_t buf_size); +const char* esc_position_format(esc_position_t* position, char* buffer, size_t buf_size); + +#endif //ESC_ESC_H diff --git a/src/connection.c b/src/connection.c new file mode 100644 index 0000000..6bf5afb --- /dev/null +++ b/src/connection.c @@ -0,0 +1,322 @@ +// +// Created by nicolas on 22/03/18. +// + +#include +#include +#include +#include "connection.h" +#include "tcp_package.h" +#include "tcp_messages.h" +#include "credentials.h" +#include "proto_helper.h" +#include "utils/array.h" +#include "utils/debug.h" +#include "utils/error.h" +#include "utils/string.h" +#include "utils/socket.h" + +// usleep +#ifdef _WIN32 +#include +#define usleep Sleep +#endif +#ifdef __linux__ +#include +#endif + +struct st_esc_connection_settings default_connection_settings = { + BOOL_FALSE +}; + +esc_connection_settings_t* esc_default_connection_settings = &default_connection_settings; + +//TODO partial transfer +ssize_t connection_send_tcp_package(esc_connection_t* conn, tcp_package_t* pkg) { + char uuid_buf[37]; +#ifdef _WIN32 + fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); +#else + fprintf(stderr, "connection_send_tcp_package: %s %u %s %lu ", get_string_for_tcp_message(pkg->command), pkg->flags, + uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); +#endif + buffer_t* send_buffer = tcp_package_to_buffer(pkg); + + size_t send_buffer_size = buffer_size(send_buffer); + uint32_t size = (uint32_t)send_buffer_size; + ssize_t rc; + if ((rc = socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t))) <= 0) { + buffer_destroy(send_buffer); + fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); + return -1; + } + + uint8_t* send_buffer_data = buffer_data(send_buffer); + if ((rc = socket_send(conn->tcp_conn, (char *) send_buffer_data, send_buffer_size)) <= 0) { + buffer_destroy(send_buffer); + fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); + return -2; + } + + buffer_destroy(send_buffer); + fprintf(stderr, "0\n"); + return 0; +} + +//TODO partial transfer +tcp_package_t* connection_recv_tcp_package(esc_connection_t* conn) { + uint32_t recv_size; + ssize_t rc; + if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { +#ifdef _WIN32 + fprintf(stderr, "connection_recv_tcp_package: %lld %d\n", rc, socket_error(conn->tcp_conn)); +#else + fprintf(stderr, "connection_recv_tcp_package: %ld %d\n", rc, socket_error(conn->tcp_conn)); +#endif + return 0; + } + buffer_t* recv_buffer = buffer_create(recv_size); + size_t recv_buffer_size = recv_size; + uint8_t* recv_buffer_data = buffer_data(recv_buffer); + while(recv_size > 0) { + size_t pos = recv_buffer_size - recv_size; + rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer_data[pos], recv_size); + recv_size -= rc; + } + tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer); + buffer_destroy(recv_buffer); + char uuid_buf[37]; +#ifdef _WIN32 + fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); +#else + fprintf(stderr, "connection_recv_tcp_package: %s %u %s %lu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, + uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); +#endif + return recv_pkg; +} + +void* connection_thread(void* arg) { + esc_connection_t* conn = arg; + + while(conn->stop == BOOL_FALSE) { + if (socket_writable(conn->tcp_conn)) { + tcp_package_t* send_pkg = queue_dequeue(conn->send_queue); + if (send_pkg != 0) { + ssize_t rc = connection_send_tcp_package(conn, send_pkg); + tcp_package_destroy(send_pkg); + if (rc != 0) { + fprintf(stderr, "failed to send pkg."); + } + } + } + + if (socket_readable(conn->tcp_conn)) { + tcp_package_t* recv_pkg = connection_recv_tcp_package(conn); + if (recv_pkg == 0) { + //TODO Handle connection lost + conn->stop = 1; + } else if (recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { + tcp_package_t* heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); + queue_enqueue(conn->send_queue, heartbeat_pkg); + tcp_package_destroy(recv_pkg); + } else { + queue_enqueue(conn->recv_queue, recv_pkg); + } + } + + usleep(1); + } + + printf("manager thread stopped\n"); + event_set(conn->stopped_event); +} + +bool_t find_tcp_package_by_correlation_id(void* _item, void* _correlation_id) { + tcp_package_t* item = _item; + uuid_t* correlation_id = _correlation_id; + if (uuid_compare(item->correlation_id, correlation_id) == 0) { + return BOOL_TRUE; + } + return BOOL_FALSE; +} + +//TODO add timeout +tcp_package_t* connection_wait_for(esc_connection_t* conn, uuid_t* correlation_id) { + tcp_package_t* found = 0; + while(found == 0) { + found = queue_remove(conn->recv_queue, find_tcp_package_by_correlation_id, correlation_id); + usleep(1); + } + return found; +} + +esc_connection_t* esc_connection_create(esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { + struct st_connection* conn = malloc(sizeof(struct st_connection)); + conn->settings = connection_settings; + if (connection_name == 0 || strcmp(connection_name, "") == 0) { + uuid_t* uuid = uuid_create(); + char buf[40]; + uuid_format(uuid, buf, 40); + conn->name = string_copy(buf); + uuid_destroy(uuid); + } else { + conn->name = connection_name; + } + conn->send_queue = queue_create(); + conn->recv_queue = queue_create(); + conn->stop = 0; + conn->stopped_event = event_create(); + conn->last_error = 0; + + if (strncmp(addr, "tcp://", 6) != 0) { + conn->last_error = error_create(1, "invalid schema for address: %s", addr); + return conn; + } + char* pos = strrchr(addr, ':'); + if (pos == 0) { + conn->last_error = error_create(2, "missing port in address: %s", addr); + return conn; + } + char* host = malloc(pos - addr - 5); + strncpy(host, addr+6, pos-addr-6); + host[pos-addr-6] = 0; + unsigned short port = (unsigned short)atoi(pos+1); + esc_static_endpoint_discoverer_t discover_data = { + {host, port}, connection_settings->use_ssl_connection + }; + + conn->discoverer_data = malloc(sizeof(struct st_esc_static_endpoint_discoverer)); + memcpy(conn->discoverer_data, &discover_data, sizeof(struct st_esc_static_endpoint_discoverer)); + conn->discover = (esc_endpoint_discoverer_t)esc_static_discover; + + return conn; +} + +void esc_connection_destroy(esc_connection_t* conn) { + esc_connection_close(conn); + + socket_destroy(conn->tcp_conn); + free(((struct st_esc_static_endpoint_discoverer*)conn->discoverer_data)->tcp_endpoint.host); + free(conn->discoverer_data); + thread_destroy(conn->manager_thread); + queue_destroy(conn->recv_queue); + queue_destroy(conn->send_queue); + event_destroy(conn->stopped_event); + free((void*)conn->name); + free(conn); +} + +// return 0 on success +// return non-zero on failure and sets last_error on connection +int esc_connection_connect(esc_connection_t* conn) { + // Discover endpoint + esc_node_endpoints_t* endpoints = conn->discover(conn->discoverer_data, 0); + if (endpoints == 0) { + return -1; + } + // Establish Tcp Connection + conn->tcp_conn = socket_create(SOCKET_TYPE_TCP); + ip_endpoint_t endpoint = conn->settings->use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint; + if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) { + conn->last_error = error_create(socket_error(conn->tcp_conn), "can't connect to %s:%d", endpoint.host, endpoint.port); + return -1; + } + free(endpoints); + // Start manager thread + conn->manager_thread = thread_create(connection_thread, conn); + if (thread_start(conn->manager_thread)) { + conn->last_error = error_create(thread_error(conn->manager_thread), "can't start manager thread.", ""); + return -1; + } + // Identify + uuid_t* correlation_id = uuid_create(); + + buffer_t* msg_buf = esc_identify_client_pack(conn->name); + tcp_package_t* send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, correlation_id, msg_buf); + queue_enqueue(conn->send_queue, send_pkg); + + tcp_package_t* recv_pkg = connection_wait_for(conn, correlation_id); + if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) { + conn->last_error = error_create(0, "server error: %d.", recv_pkg->command); + uuid_destroy(correlation_id); + tcp_package_destroy(recv_pkg); + return -1; + } + + uuid_destroy(correlation_id); + tcp_package_destroy(recv_pkg); + return 0; +} + +esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t* conn, esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t* credentials) { + uuid_t* correlation_id = uuid_create(); + + buffer_t* msg_buf = esc_read_all_forward_pack(last_checkpoint, count, 0); + tcp_package_t* send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, correlation_id, msg_buf, credentials->username, credentials->password); + queue_enqueue(conn->send_queue, send_pkg); + + tcp_package_t* recv_pkg = connection_wait_for(conn, correlation_id); + if (recv_pkg->command != MESSAGE_READALLEVENTSFORWARDCOMPLETED) { + conn->last_error = error_create(recv_pkg->command, "wrong message from the server: %s", get_string_for_tcp_message(recv_pkg->command)); + tcp_package_destroy(recv_pkg); + uuid_destroy(correlation_id); + return 0; + } + + esc_all_events_slice_t* result; + inspection_result_t* inspection_result = esc_all_events_slice_unpack(recv_pkg->data, &result); + // TODO fully handle inspection results + if (inspection_result->error) { + conn->last_error = error_copy(inspection_result->error); + inspection_result_destroy(inspection_result); + tcp_package_destroy(recv_pkg); + uuid_destroy(correlation_id); + return 0; + } + inspection_result_destroy(inspection_result); + tcp_package_destroy(recv_pkg); + uuid_destroy(correlation_id); + return result; +} + +esc_write_result_t* esc_append_to_stream(esc_connection_t* conn, const char* stream, int64_t expected_version, array_t* events) { + uuid_t* correlation_id = uuid_create(); + + buffer_t* msg_buf = esc_append_to_stream_pack(stream, expected_version, events); + tcp_package_t* send_pkg = tcp_package_create(MESSAGE_WRITEEVENTS, correlation_id, msg_buf); + queue_enqueue(conn->send_queue, send_pkg); + + tcp_package_t* recv_pkg = connection_wait_for(conn, correlation_id); + if (recv_pkg->command != MESSAGE_WRITEEVENTSCOMPLETED) { + conn->last_error = error_create(recv_pkg->command, "server error: %s", get_string_for_tcp_message(recv_pkg->command)); + tcp_package_destroy(recv_pkg); + uuid_destroy(correlation_id); + return 0; + } + + esc_write_result_t* result; + inspection_result_t* inspection_result = esc_write_result_unpack(recv_pkg->data, &result); + // TODO fully handle inspection results + if (inspection_result->error) { + conn->last_error = error_copy(inspection_result->error); + inspection_result_destroy(inspection_result); + tcp_package_destroy(recv_pkg); + uuid_destroy(correlation_id); + return 0; + } + inspection_result_destroy(inspection_result); + tcp_package_destroy(recv_pkg); + uuid_destroy(correlation_id); + return result; +} + +void esc_connection_close(esc_connection_t* conn) { + if (conn->stop == 1) return; + conn->stop = 1; + event_wait(conn->stopped_event); + socket_close(conn->tcp_conn); +} + +inline error_t* esc_connection_last_error(esc_connection_t* conn) { + return conn->last_error; +} diff --git a/src/connection.h b/src/connection.h new file mode 100644 index 0000000..abaa76e --- /dev/null +++ b/src/connection.h @@ -0,0 +1,41 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_ESC_CONNECTION_H +#define ESC_ESC_CONNECTION_H + +#include +#include "utils/bool.h" +#include "utils/socket.h" +#include "utils/thread.h" +#include "utils/mutex.h" +#include "utils/queue.h" +#include "utils/error.h" +#include "endpoint_discoverer.h" +#include "utils/event.h" + +typedef struct st_esc_connection_settings { + bool_t use_ssl_connection; +} esc_connection_settings_t; + +esc_connection_settings_t* esc_default_connection_settings; + +typedef struct st_connection { + esc_connection_settings_t* settings; + const char* name; + void* discoverer_data; + esc_endpoint_discoverer_t discover; + socket_t* tcp_conn; + thread_t* manager_thread; + bool_t stop; + queue_t* send_queue; + queue_t* recv_queue; + error_t* last_error; + event_t* stopped_event; +} esc_connection_t; + +error_t* esc_connection_last_error(esc_connection_t* conn); +void esc_connection_close(esc_connection_t* conn); + +#endif //ESC_ESC_CONNECTION_H diff --git a/src/credentials.c b/src/credentials.c new file mode 100644 index 0000000..3792fcf --- /dev/null +++ b/src/credentials.c @@ -0,0 +1,21 @@ +// +// Created by nicolas on 22/03/18. +// + +#include +#include "utils/debug.h" +#include "utils/string.h" +#include "credentials.h" + +esc_credentials_t* esc_credentials_create(const char* username, const char* password) { + esc_credentials_t* creds = malloc(sizeof(struct st_credentials)); + creds->username = string_copy(username); + creds->password = string_copy(password); + return creds; +} + +void esc_credentials_destroy(esc_credentials_t* creds) { + free((void*)creds->username); + free((void*)creds->password); + free(creds); +} diff --git a/src/credentials.h b/src/credentials.h new file mode 100644 index 0000000..e20b5d3 --- /dev/null +++ b/src/credentials.h @@ -0,0 +1,17 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_ESC_CREDENTIALS_H +#define ESC_ESC_CREDENTIALS_H + +struct st_credentials { + const char* username; + const char* password; +}; +typedef struct st_credentials esc_credentials_t; + +esc_credentials_t* esc_credentials_create(const char* username, const char* password); +void esc_credentials_destroy(esc_credentials_t* creds); + +#endif //ESC_ESC_CREDENTIALS_H diff --git a/src/endpoint_discoverer.c b/src/endpoint_discoverer.c new file mode 100644 index 0000000..6e8bae4 --- /dev/null +++ b/src/endpoint_discoverer.c @@ -0,0 +1,16 @@ +// +// Created by nicolas on 22/03/18. +// + +#include "utils/debug.h" +#include "endpoint_discoverer.h" + +const esc_node_endpoints_t* esc_static_discover(const esc_static_endpoint_discoverer_t* discover_data, const ip_endpoint_t* failed_tcp_endpoint) { + esc_node_endpoints_t* result = malloc(sizeof(struct st_esc_node_endpoints)); + if (discover_data->use_ssl_connection) { + result->secure_tcp_endpoint = discover_data->tcp_endpoint; + } else { + result->tcp_endpoint = discover_data->tcp_endpoint; + } + return result; +} diff --git a/src/endpoint_discoverer.h b/src/endpoint_discoverer.h new file mode 100644 index 0000000..ce5f234 --- /dev/null +++ b/src/endpoint_discoverer.h @@ -0,0 +1,30 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_ESC_ENDPOINT_DISCOVERER_H +#define ESC_ESC_ENDPOINT_DISCOVERER_H + +#include "utils/bool.h" + +typedef struct st_ip_endpoint { + char* host; + unsigned short port; +} ip_endpoint_t; + +typedef struct st_esc_node_endpoints { + ip_endpoint_t tcp_endpoint; + ip_endpoint_t secure_tcp_endpoint; +} esc_node_endpoints_t; + +typedef esc_node_endpoints_t* (*esc_endpoint_discoverer_t)(const void* discover_data, const ip_endpoint_t* failed_tcp_endpoint); + +struct st_esc_static_endpoint_discoverer { + ip_endpoint_t tcp_endpoint; + bool_t use_ssl_connection; +}; +typedef struct st_esc_static_endpoint_discoverer esc_static_endpoint_discoverer_t; + +const esc_node_endpoints_t* esc_static_discover(const esc_static_endpoint_discoverer_t* discover_data, const ip_endpoint_t* failed_tcp_endpoint); + +#endif //ESC_ESC_ENDPOINT_DISCOVERER_H diff --git a/src/esc.c b/src/esc.c deleted file mode 100644 index 12f1516..0000000 --- a/src/esc.c +++ /dev/null @@ -1,410 +0,0 @@ -// -// Created by nicol on 2018-03-18. -// - -#include -#include -#include -#include -#include "esc.h" -#include "utils/debug.h" -#include "utils/socket.h" -#include "proto.h" -#include "tcp_package.h" -#include "utils/thread.h" -#include "utils/mutex.h" -#include "utils/string.h" -#include "utils/queue.h" -#include "tcp_messages.h" - -#ifdef _WIN32 -#include -#define usleep Sleep -#endif -#ifdef __linux__ -#include -#endif - -typedef int bool_t; -const bool_t BOOL_TRUE = 1; -const bool_t BOOL_FALSE = 0; - -const esc_error_t _error_create(const char* file, int line, int code, char* format, ...) { - va_list vl; - va_start(vl, format); - esc_error_t err; - err.file = file; - err.line = line; - err.code = code; - vsnprintf(err.message, ERROR_MSG_SIZE, format, vl); - return err; -} -#define error_create(code, fmt, args...) _error_create(__FILE__, __LINE__, code, fmt, args) - -struct st_connection_settings { - bool_t use_ssl_connection; -}; - -const esc_connection_settings_t default_connection_settings = { -}; - -const esc_connection_settings_t* esc_default_connection_settings = &default_connection_settings; - -struct st_tcp_endpoint { - char* host; - unsigned short port; -}; -typedef struct st_tcp_endpoint tcp_endpoint_t; - -struct st_node_endpoints { - tcp_endpoint_t tcp_endpoint; - tcp_endpoint_t secure_tcp_endpoint; -}; -typedef struct st_node_endpoints node_endpoints_t; - -typedef const node_endpoints_t* (*endpoint_discoverer_t)(const void* discover_data, const tcp_endpoint_t* failed_tcp_endpoint); - -struct st_connection { - esc_connection_settings_t settings; - const char* name; - void* discoverer_data; - endpoint_discoverer_t discover; - socket_t tcp_conn; - ProtobufCAllocator protobuf_c_allocator; - thread_t manager_thread; - bool_t stop; - mutex_t recv_peek_lock; - queue_t send_queue; - queue_t recv_queue; - bool_t has_error; - esc_error_t last_error; -}; - -struct st_static_endpoint_discoverer { - tcp_endpoint_t tcp_endpoint; - bool_t use_ssl_connection; -}; -typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t; - -const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* discover_data, const tcp_endpoint_t* failed_tcp_endpoint) { - node_endpoints_t* result = malloc(sizeof(struct st_node_endpoints)); - if (discover_data->use_ssl_connection) { - result->secure_tcp_endpoint = discover_data->tcp_endpoint; - } else { - result->tcp_endpoint = discover_data->tcp_endpoint; - } - return result; -} - -//TODO partial transfer -ssize_t connection_send_tcp_package(esc_connection_t conn, tcp_package_t pkg) { - char uuid_buf[37]; -#ifdef _WIN32 - fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); -#else - fprintf(stderr, "connection_send_tcp_package: %s %u %s %lu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); -#endif - buffer_t send_buffer = tcp_package_to_buffer(pkg); - - size_t send_buffer_size = buffer_size(send_buffer); - uint32_t size = (uint32_t)send_buffer_size; - ssize_t rc; - if ((rc = socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t))) <= 0) { - buffer_destroy(send_buffer); - fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); - return -1; - } - - uint8_t* send_buffer_data = buffer_data(send_buffer); - if ((rc = socket_send(conn->tcp_conn, (char *) send_buffer_data, send_buffer_size)) <= 0) { - buffer_destroy(send_buffer); - fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); - return -2; - } - - buffer_destroy(send_buffer); - fprintf(stderr, "0\n"); - return 0; -} - -//TODO partial transfer -tcp_package_t connection_recv_tcp_package(esc_connection_t conn) { - uint32_t recv_size; - ssize_t rc; - if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { -#ifdef _WIN32 - fprintf(stderr, "connection_recv_tcp_package: %lld %d\n", rc, socket_error(conn->tcp_conn)); -#else - fprintf(stderr, "connection_recv_tcp_package: %ld %d\n", rc, socket_error(conn->tcp_conn)); -#endif - return 0; - } - buffer_t recv_buffer = buffer_create(recv_size); - size_t recv_buffer_size = recv_size; - uint8_t* recv_buffer_data = buffer_data(recv_buffer); - while(recv_size > 0) { - size_t pos = recv_buffer_size - recv_size; - rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer_data[pos], recv_size); - recv_size -= rc; - } - tcp_package_t recv_pkg = tcp_package_from_buffer(recv_buffer); - buffer_destroy(recv_buffer); - char uuid_buf[37]; -#ifdef _WIN32 - fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); -#else - fprintf(stderr, "connection_recv_tcp_package: %s %u %s %lu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); -#endif - return recv_pkg; -} - -inline void connection_enqueue_send(esc_connection_t conn, tcp_package_t pkg) { - queue_enqueue(conn->send_queue, pkg); -} - -tcp_package_t connection_wait_for(esc_connection_t conn, esc_uuid_t correlation_id) { - tcp_package_t found = 0; - while(found == 0) { - mutex_lock(conn->recv_peek_lock); - tcp_package_t peek = queue_peek(conn->recv_queue); - if (peek && esc_uuid_compare(peek->correlation_id, correlation_id) == 0) { - queue_dequeue(conn->recv_queue); - found = peek; - } - mutex_unlock(conn->recv_peek_lock); - usleep(1); - } - return found; -} - -void* protobuf_c_alloc(void *alloc_data, size_t size) { - return malloc(size); -} - -void protobuf_c_free(void *alloc_data, void* p) { - free(p); -} - -esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { - struct st_connection* conn = malloc(sizeof(struct st_connection)); - conn->settings = *connection_settings; - conn->protobuf_c_allocator.alloc = protobuf_c_alloc; - conn->protobuf_c_allocator.free = protobuf_c_free; - conn->protobuf_c_allocator.allocator_data = 0; - if (connection_name == 0 || strcmp(connection_name, "") == 0) { - esc_uuid_t uuid = esc_uuid_create(); - char buf[40]; - esc_uuid_format(uuid, buf, 40); - conn->name = string_copy(buf); - esc_uuid_destroy(uuid); - } else { - conn->name = connection_name; - } - conn->send_queue = queue_create(); - conn->recv_peek_lock = mutex_create(); - conn->recv_queue = queue_create(); - conn->stop = 0; - conn->has_error = 0; - memset(&conn->last_error, 0, sizeof(struct st_error)); - - if (strncmp(addr, "tcp://", 6) != 0) { - conn->has_error = 1; - conn->last_error = error_create(1, "invalid schema for address: %s", addr); - return conn; - } - char* pos = strrchr(addr, ':'); - if (pos == 0) { - conn->has_error = 1; - conn->last_error = error_create(2, "missing port in address: %s", addr); - return conn; - } - char* host = malloc(pos - addr - 5); - strncpy(host, addr+6, pos-addr-6); - host[pos-addr-6] = 0; - unsigned short port = (unsigned short)atoi(pos+1); - static_endpoint_discoverer_t discover_data = { - {host, port}, connection_settings->use_ssl_connection - }; - - conn->discoverer_data = malloc(sizeof(struct st_static_endpoint_discoverer)); - memcpy(conn->discoverer_data, &discover_data, sizeof(struct st_static_endpoint_discoverer)); - conn->discover = (endpoint_discoverer_t)static_discover; - - - return conn; -} - -void* connection_thread(void* arg) { - esc_connection_t conn = arg; - - while(conn->stop == BOOL_FALSE) { - if (socket_writable(conn->tcp_conn)) { - tcp_package_t send_pkg = queue_dequeue(conn->send_queue); - if (send_pkg != 0) { - ssize_t rc = connection_send_tcp_package(conn, send_pkg); - if (rc == 0) { - tcp_package_destroy(send_pkg); - } else { - fprintf(stderr, "failed to send pkg."); - } - } - } - - if (socket_readable(conn->tcp_conn)) { - tcp_package_t recv_pkg = connection_recv_tcp_package(conn); - if (recv_pkg == 0) { - //TODO Handle connection lost - conn->stop = 1; - } else if (recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { - tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); - queue_enqueue(conn->send_queue, heartbeat_pkg); - } else { - queue_enqueue(conn->recv_queue, recv_pkg); - } - } - - usleep(1); - } - - printf("manager thread stopping\n"); -} - -// return 0 on success -// return non-zero on failure and sets last_error on connection -int esc_connection_connect(esc_connection_t conn) { - // Discover endpoint - const node_endpoints_t* endpoints = conn->discover(conn->discoverer_data, 0); - if (endpoints == 0) { - return -1; - } - // Establish Tcp Connection - conn->tcp_conn = socket_create(SOCKET_TYPE_TCP); - tcp_endpoint_t endpoint = conn->settings.use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint; - if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) { - conn->last_error = error_create(socket_error(conn->tcp_conn), "can't connect to %s:%d", endpoint.host, endpoint.port); - return -1; - } - // Start manager thread - conn->manager_thread = thread_create(connection_thread, conn); - if (thread_start(conn->manager_thread)) { - conn->last_error = error_create(thread_error(conn->manager_thread), "can't start manager thread.", ""); - return -1; - } - // Identify - // build message - EventStore__Client__Messages__IdentifyClient identify_client; - event_store__client__messages__identify_client__init(&identify_client); - identify_client.connection_name = (char*)conn->name; - identify_client.version = 1; - size_t s = event_store__client__messages__identify_client__get_packed_size(&identify_client); - uint8_t buffer[s]; - event_store__client__messages__identify_client__pack(&identify_client, buffer); - // build tcp_package - esc_uuid_t correlation_id = esc_uuid_create(); - tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, correlation_id, buffer_from(buffer, s)); - queue_enqueue(conn->send_queue, send_pkg); - - tcp_package_t recv_pkg = connection_wait_for(conn, correlation_id); - if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) { - conn->last_error = error_create(0, "server error: %d.", recv_pkg->command); - tcp_package_destroy(recv_pkg); - return -1; - } - - tcp_package_destroy(recv_pkg); - return 0; -} - -esc_credentials_t esc_credentials_create(const char* username, const char* password) { - esc_credentials_t creds = malloc(sizeof(struct st_credentials)); - creds->username = string_copy(username); - creds->password = string_copy(password); - return creds; -} - -const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) { - if (msg == 0) return 0; - esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event)); - ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len); - ev->event_type = string_copy(msg->event_type); - ev->event_number = msg->event_number; - ev->event_stream_id = string_copy(msg->event_stream_id); - ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0; - ev->data = buffer_from(msg->data.data, msg->data.len); - ev->metadata = buffer_from(msg->metadata.data, msg->metadata.len); - - return ev; -} - -esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__ResolvedEvent* msg) { - esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event)); - ev->original_position.prepare_position = msg->prepare_position; - ev->original_position.commit_position = msg->commit_position; - ev->event = recorded_event_create(msg->event); - ev->link = recorded_event_create(msg->link); - return ev; -} - -esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t credentials) { - //TODO function for packing from client struct direct to protobuf data - EventStore__Client__Messages__ReadAllEvents send_msg; - event_store__client__messages__read_all_events__init(&send_msg); - send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0; - send_msg.commit_position = last_checkpoint ? last_checkpoint->commit_position : 0; - send_msg.max_count = count; - send_msg.require_master = 0; - send_msg.resolve_link_tos = 0; - size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); - uint8_t buffer[s]; - event_store__client__messages__read_all_events__pack(&send_msg, buffer); - esc_uuid_t correlation_id = esc_uuid_create(); - tcp_package_t send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, correlation_id, buffer_from(buffer, s), credentials->username, credentials->password); - queue_enqueue(conn->send_queue, send_pkg); - - tcp_package_t recv_pkg = connection_wait_for(conn, correlation_id); - if (recv_pkg->command != MESSAGE_READALLEVENTSFORWARDCOMPLETED) { - conn->last_error = error_create(recv_pkg->command, "server error: %s", get_string_for_tcp_message(recv_pkg->command)); - return 0; - } - - //TODO function for unpacking from protobuf data to client struct - size_t data_size = buffer_size(recv_pkg->data); - uint8_t* data = buffer_data(recv_pkg->data); - - EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = - event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, data_size, data); - - esc_all_events_slice_t result = malloc(sizeof(struct st_all_events_slice)); - result->read_direction = "forward"; - result->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0; - result->from_position.prepare_position = recv_msg->prepare_position; - result->from_position.commit_position = recv_msg->commit_position; - result->next_position.prepare_position = recv_msg->next_prepare_position; - result->next_position.commit_position = recv_msg->next_commit_position; - result->n_events = recv_msg->n_events; - result->events = malloc(recv_msg->n_events * sizeof(void*)); - for (size_t i = 0; i < recv_msg->n_events; i++) { - result->events[i] = resolved_event_create(recv_msg->events[i]); - } - event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); - - return result; -} - -void esc_connection_close(esc_connection_t conn) { - conn->stop = 1; - socket_close(conn->tcp_conn); -} - -inline esc_error_t esc_connection_last_error(esc_connection_t conn) { - return conn->last_error; -} - -const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { -#ifdef _WIN32 - snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); -#else - snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position); -#endif - return buffer; -} diff --git a/src/esc.h b/src/esc.h deleted file mode 100644 index 7b4156c..0000000 --- a/src/esc.h +++ /dev/null @@ -1,80 +0,0 @@ -// -// Created by nicol on 2018-03-18. -// - -#ifndef ESC_ESC_H -#define ESC_ESC_H - -#include "utils/uuid.h" -#include "utils/buffer.h" - -#define ERROR_MSG_SIZE 1024 -struct st_error { - const char* file; - int line; - int code; - char message[ERROR_MSG_SIZE]; -}; -typedef struct st_error esc_error_t; - -struct st_connection_settings; -typedef struct st_connection_settings esc_connection_settings_t; - -struct st_connection; -typedef struct st_connection* esc_connection_t; - -struct st_credentials { - const char* username; - const char* password; -}; -typedef struct st_credentials* esc_credentials_t; - -struct st_esc_position { - int64_t prepare_position; - int64_t commit_position; -}; -typedef struct st_esc_position esc_position_t; - -struct st_recorded_event { - esc_uuid_t event_id; - const char* event_type; - int64_t event_number; - const char *event_stream_id; - int64_t created_epoch; - buffer_t data; - buffer_t metadata; -}; -typedef struct st_recorded_event esc_recorded_event_t; - -struct st_resolved_event { - const esc_recorded_event_t* event; - const esc_recorded_event_t* link; - esc_position_t original_position; -}; -typedef struct st_resolved_event esc_resolved_event_t; - -struct st_all_events_slice { - char* read_direction; - esc_position_t from_position; - esc_position_t next_position; - size_t n_events; - esc_resolved_event_t** events; - int is_end_of_stream; -}; -typedef struct st_all_events_slice* esc_all_events_slice_t; - -const esc_connection_settings_t* esc_default_connection_settings; - -// Connection -esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name); -int esc_connection_connect(esc_connection_t conn); -esc_credentials_t esc_credentials_create(const char* username, const char* password); -esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t credentials); -void esc_connection_close(esc_connection_t conn); - -esc_error_t esc_connection_last_error(esc_connection_t conn); - -// Utils -const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size); - -#endif //ESC_ESC_H diff --git a/src/event_data.c b/src/event_data.c new file mode 100644 index 0000000..7cf647d --- /dev/null +++ b/src/event_data.c @@ -0,0 +1,25 @@ +// +// Created by nicolas on 23/03/18. +// + +#include "event_data.h" +#include "utils/debug.h" +#include "utils/string.h" + +esc_event_data_t* esc_event_data_create(uuid_t* event_id, const char* event_type, bool_t is_json, buffer_t* data, buffer_t* metadata) { + esc_event_data_t* event_data = malloc(sizeof(esc_event_data_t)); + event_data->event_id = uuid_copy(event_id); + event_data->event_type = string_copy(event_type); + event_data->is_json = is_json; + event_data->data = data ? buffer_copy(data) : 0; + event_data->metadata = metadata ? buffer_copy(metadata) : 0; + return event_data; +} + +void esc_event_data_destroy(esc_event_data_t* event_data) { + uuid_destroy(event_data->event_id); + free((void*)event_data->event_type); + if (event_data->data) buffer_destroy(event_data->data); + if (event_data->metadata) buffer_destroy(event_data->metadata); + free(event_data); +} \ No newline at end of file diff --git a/src/event_data.h b/src/event_data.h new file mode 100644 index 0000000..a0e0224 --- /dev/null +++ b/src/event_data.h @@ -0,0 +1,23 @@ +// +// Created by nicolas on 23/03/18. +// + +#ifndef ESC_EVENT_DATA_H +#define ESC_EVENT_DATA_H + +#include "utils/uuid.h" +#include "utils/bool.h" +#include "utils/buffer.h" + +typedef struct st_esc_event_data { + uuid_t* event_id; + const char* event_type; + bool_t is_json; + buffer_t* data; + buffer_t* metadata; +} esc_event_data_t; + +esc_event_data_t* esc_event_data_create(uuid_t* event_id, const char* event_type, bool_t is_json, buffer_t* data, buffer_t* metadata); +void esc_event_data_destroy(esc_event_data_t*); + +#endif //ESC_EVENT_DATA_H diff --git a/src/main.c b/src/main.c deleted file mode 100644 index 5ada39d..0000000 --- a/src/main.c +++ /dev/null @@ -1,68 +0,0 @@ -#include -#include "utils/debug.h" -#include "esc.h" - -#ifdef _WIN32 -#include -#include -#define usleep Sleep -#define sleep(x) Sleep(x*1000) -#endif -#ifdef __linux__ -#include -#endif - -int main() { -#ifdef _WIN32 - WSADATA wsaData; - WSAStartup(MAKEWORD(2,0), &wsaData); -#endif - - esc_connection_t conn = esc_connection_create(esc_default_connection_settings, "tcp://127.0.0.1:1113", NULL); - if (esc_connection_connect(conn) != 0) { - esc_error_t err = esc_connection_last_error(conn); - fprintf(stderr, "Error: %s code=%d file=%s line=%d", err.message, err.code, err.file, err.line); - return -2; - } - esc_credentials_t credentials = esc_credentials_create("admin", "changeit"); - - esc_all_events_slice_t result = 0; - do { - result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 100, credentials); - if (result == 0) { - esc_error_t err = esc_connection_last_error(conn); - fprintf(stderr, "Error: %s code=%d file=%s line=%d", err.message, err.code, err.file, err.line); - return -3; - } - char posbuf1[44]; - char posbuf2[44]; - printf("%s %s %s %u\n", result->read_direction, - esc_position_format(&result->from_position, posbuf1, 44), - esc_position_format(&result->next_position, posbuf2, 44), - result->is_end_of_stream); - char uuid_buf[37]; - for (size_t i = 0; i < result->n_events; i++) { -#ifdef _WIN32 - printf("%s %s %lld@%s %llu %llu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), -#else - printf("%s %s %ld@%s %lu %lu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), -#endif - result->events[i]->event->event_type, - result->events[i]->event->event_number, - result->events[i]->event->event_stream_id, - buffer_size(result->events[i]->event->data), - buffer_size(result->events[i]->event->metadata)); - } - } while(result->is_end_of_stream == 0); - - sleep(10); - - esc_connection_close(conn); - - dbg_list_allocs(); - -#ifdef _WIN32 - WSACleanup(); -#endif - return 0; -} diff --git a/src/position.c b/src/position.c new file mode 100644 index 0000000..4a0abcd --- /dev/null +++ b/src/position.c @@ -0,0 +1,37 @@ +// +// Created by nicolas on 22/03/18. +// + +#include +#include +#include +#include "position.h" +#include "utils/debug.h" + +esc_position_t* esc_position_create(int64_t prepare_position, int64_t commit_position) { + esc_position_t* res = malloc(sizeof(struct st_esc_position)); + res->prepare_position = prepare_position; + res->commit_position = commit_position; + return res; +} + +inline void esc_position_destroy(esc_position_t* pos) { + free(pos); +} + +int esc_position_compare(esc_position_t* left, esc_position_t* right) { + if ((left->commit_position < right->commit_position) || ((left->commit_position == right->commit_position) && (left->prepare_position < right->prepare_position))) + return -1; + if ((left->commit_position > right->commit_position) || ((left->commit_position == right->commit_position) && (left->prepare_position > right->prepare_position))) + return 1; + return 0; +} + +const char* esc_position_format(esc_position_t* position, char* buffer, size_t buf_size) { +#ifdef _WIN32 + snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); +#else + snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position); +#endif + return buffer; +} diff --git a/src/position.h b/src/position.h new file mode 100644 index 0000000..4ce1c9f --- /dev/null +++ b/src/position.h @@ -0,0 +1,20 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_ESC_POSITION_H +#define ESC_ESC_POSITION_H + +#include + +typedef struct st_esc_position { + int64_t prepare_position; + int64_t commit_position; +} esc_position_t; + +esc_position_t* esc_position_create(int64_t prepare_position, int64_t commit_position); +void esc_position_destroy(esc_position_t*); +int esc_position_compare(esc_position_t* left, esc_position_t* right); +const char* esc_position_format(esc_position_t* position, char* buffer, size_t buf_len); + +#endif //ESC_ESC_POSITION_H diff --git a/src/proto_helper.c b/src/proto_helper.c new file mode 100644 index 0000000..790d03b --- /dev/null +++ b/src/proto_helper.c @@ -0,0 +1,213 @@ +// +// Created by nicolas on 22/03/18. +// + +#include "utils/debug.h" +#include "utils/string.h" +#include "results.h" +#include "proto_helper.h" +#include "proto.h" +#include "utils/bool.h" +#include "event_data.h" + +void* protobuf_c_alloc(void *alloc_data, size_t size) { + return malloc(size); +} + +void protobuf_c_free(void *alloc_data, void* p) { + free(p); +} + +ProtobufCAllocator protobuf_allocator = { + protobuf_c_alloc, protobuf_c_free, 0 +}; + +inspection_result_t* inspection_result_create(operation_decision_t decision, const char* description, error_t* error) { + inspection_result_t* inspection_result = malloc(sizeof(inspection_result_t)); + inspection_result->decision = decision; + inspection_result->description = description; + inspection_result->error = error; + return inspection_result; +} + +void inspection_result_destroy(inspection_result_t* inspection_result) { + free(inspection_result->error); + free(inspection_result); +} + +buffer_t* esc_identify_client_pack(const char* connection_name) { + EventStore__Client__Messages__IdentifyClient identify_client; + event_store__client__messages__identify_client__init(&identify_client); + identify_client.connection_name = (char*)connection_name; + identify_client.version = 1; + size_t msg_size = event_store__client__messages__identify_client__get_packed_size(&identify_client); + uint8_t msg_buf[msg_size]; + event_store__client__messages__identify_client__pack(&identify_client, msg_buf); + return buffer_copyfrom(msg_buf, msg_size); +} + +esc_recorded_event_t* esc_recorded_event_unpack(EventStore__Client__Messages__EventRecord* msg) { + if (msg == 0) return 0; + esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event)); + ev->event_id = uuid_from(msg->event_id.data, msg->event_id.len); + ev->event_type = string_copy(msg->event_type); + ev->event_number = msg->event_number; + ev->event_stream_id = string_copy(msg->event_stream_id); + ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0; + ev->data = buffer_copyfrom(msg->data.data, msg->data.len); + ev->metadata = buffer_copyfrom(msg->metadata.data, msg->metadata.len); + //event_store__client__messages__event_record__free_unpacked(msg, &protobuf_allocator); + return ev; +} + +esc_resolved_event_t* esc_resolved_event_unpack(EventStore__Client__Messages__ResolvedEvent* msg) { + esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event)); + ev->original_position = esc_position_create(msg->prepare_position, msg->commit_position); + ev->event = esc_recorded_event_unpack(msg->event); + ev->link = esc_recorded_event_unpack(msg->link); + //event_store__client__messages__resolved_event__free_unpacked(msg, &protobuf_allocator); + return ev; +} + +const char* get_string_for_all_events_result(int result) { + switch(result) { + case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Success: return "Success"; + case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__NotModified: return "Not Modified"; + case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__AccessDenied: return "Access Denied"; + case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Error: return "Error"; + default: return "Unknown"; + } +} + +inspection_result_t* esc_all_events_slice_unpack(buffer_t* buffer, esc_all_events_slice_t** all_events_slice_p) { + EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = + event_store__client__messages__read_all_events_completed__unpack( + &protobuf_allocator, + buffer_size(buffer), + buffer_data(buffer)); + + inspection_result_t* inspection_result; + switch (recv_msg->result) { + case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Success: { + esc_all_events_slice_t *all_events_slice = malloc(sizeof(struct st_all_events_slice)); + all_events_slice->read_direction = "forward"; + all_events_slice->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0; + all_events_slice->from_position = esc_position_create(recv_msg->prepare_position, + recv_msg->commit_position); + all_events_slice->next_position = esc_position_create(recv_msg->next_prepare_position, + recv_msg->next_commit_position); + all_events_slice->n_events = recv_msg->n_events; + all_events_slice->events = malloc(recv_msg->n_events * sizeof(void *)); + for (size_t i = 0; i < recv_msg->n_events; i++) { + all_events_slice->events[i] = esc_resolved_event_unpack(recv_msg->events[i]); + } + *all_events_slice_p = all_events_slice; + inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Success", 0); + break; + } + case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Error: { + error_t* error = error_create(recv_msg->result, "Server error: %s", recv_msg->error); + inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Error", error); + break; + } + case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__AccessDenied: { + error_t* error = error_create(recv_msg->result, "Access denied: %s %s", "Read", "All"); + inspection_result = inspection_result_create(Operation_Decision_EndOperation, "AccessDenied", error); + break; + } + default: { + error_t* error = error_create(recv_msg->result, "Unexpected ReadStreamResult %s", get_string_for_all_events_result(recv_msg->result)); + inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Unexpected", error); + break; + } + } + event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &protobuf_allocator); + return inspection_result; +} + +buffer_t* esc_read_all_forward_pack(esc_position_t* last_checkpoint, int32_t count, bool_t resolve_link_tos) { + EventStore__Client__Messages__ReadAllEvents send_msg; + event_store__client__messages__read_all_events__init(&send_msg); + send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0; + send_msg.commit_position = last_checkpoint ? last_checkpoint->commit_position : 0; + send_msg.max_count = count; + send_msg.require_master = 0; + send_msg.resolve_link_tos = resolve_link_tos; + size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); + uint8_t buffer[s]; + event_store__client__messages__read_all_events__pack(&send_msg, buffer); + return buffer_copyfrom(buffer, s); +} + +const char* get_string_for_write_result(int write_result) { + switch(write_result) { + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__Success: return "Success"; + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__PrepareTimeout: return "PrepareTimeout"; + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__CommitTimeout: return "CommitTimeout"; + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__ForwardTimeout: return "ForwardTimeout"; + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__WrongExpectedVersion: return "WrongExpectedVersion"; + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__StreamDeleted: return "StreamDeleted"; + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__InvalidTransaction: return "InvalidTransaction"; + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__AccessDenied: return "AccessDenied"; + default: return "Unknown"; + } +} + +inspection_result_t* esc_write_result_unpack(buffer_t* buffer, esc_write_result_t** write_result_p) { + EventStore__Client__Messages__WriteEventsCompleted* recv_msg = + event_store__client__messages__write_events_completed__unpack(&protobuf_allocator, buffer_size(buffer), buffer_data(buffer)); + inspection_result_t* inspection_result; + switch(recv_msg->result) { + case EVENT_STORE__CLIENT__MESSAGES__OPERATION_RESULT__Success: { + esc_write_result_t *write_result = malloc(sizeof(esc_write_result_t)); + write_result->next_expected_version = recv_msg->last_event_number; + write_result->log_position = esc_position_create( + recv_msg->prepare_position ? recv_msg->prepare_position : -1, + recv_msg->commit_position ? recv_msg->commit_position : -1); + *write_result_p = write_result; + inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Success", 0); + break; + } + default: { + error_t* error = error_create(recv_msg->result, "Unexpected OperationResult %s", get_string_for_write_result(recv_msg->result)); + inspection_result = inspection_result_create(Operation_Decision_EndOperation, "Unexpected", error); + break; + } + } + event_store__client__messages__write_events_completed__free_unpacked(recv_msg, &protobuf_allocator); + return inspection_result; +} + +buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version, array_t* events) { + EventStore__Client__Messages__WriteEvents send_msg; + event_store__client__messages__write_events__init(&send_msg); + send_msg.event_stream_id = (char*)stream; + send_msg.expected_version = expected_version; + send_msg.require_master = 0; + send_msg.n_events = events->size; + send_msg.events = malloc(events->size * sizeof(EventStore__Client__Messages__NewEvent*)); + for (size_t i = 0; i < events->size; i++) { + esc_event_data_t* event_data = events->data[i]; + EventStore__Client__Messages__NewEvent* new_event = malloc(sizeof(EventStore__Client__Messages__NewEvent)); + event_store__client__messages__new_event__init(new_event); + new_event->event_id.data = (uint8_t *)event_data->event_id; + new_event->event_id.len = 16; + new_event->metadata.data = event_data->metadata ? buffer_data(event_data->metadata) : 0; + new_event->metadata.len = event_data->metadata ? buffer_size(event_data->metadata) : 0; + new_event->data.data = event_data->data ? buffer_data(event_data->data) : 0; + new_event->data.len = event_data->data ? buffer_size(event_data->data) : 0; + new_event->event_type = (char*)event_data->event_type; + new_event->data_content_type = event_data->is_json ? 1 : 0; + new_event->metadata_content_type = event_data->is_json ? 1 : 0; + new_event->has_metadata = event_data->metadata ? 1 : 0; + send_msg.events[i] = new_event; + } + size_t size = event_store__client__messages__write_events__get_packed_size(&send_msg); + uint8_t buffer[size]; + event_store__client__messages__write_events__pack(&send_msg, buffer); + for (size_t i = 0; i < events->size; i++) { + free(send_msg.events[i]); + } + free(send_msg.events); + return buffer_copyfrom(buffer, size); +} diff --git a/src/proto_helper.h b/src/proto_helper.h new file mode 100644 index 0000000..00a8abc --- /dev/null +++ b/src/proto_helper.h @@ -0,0 +1,39 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_PROTO_HELPER_H +#define ESC_PROTO_HELPER_H + +#include "utils/array.h" +#include "utils/buffer.h" +#include "utils/bool.h" +#include "utils/error.h" +#include "results.h" + +typedef enum { + Operation_Decision_DoNothing, + Operation_Decision_EndOperation, + Operation_Decision_Retry, + Operation_Decision_Reconnect, + Operation_Decision_Subscribed +} operation_decision_t; + +typedef struct { + operation_decision_t decision; + const char* description; + error_t* error; +} inspection_result_t; +void inspection_result_destroy(inspection_result_t*); + +buffer_t* esc_identify_client_pack(const char* connection_name); + +inspection_result_t* esc_all_events_slice_unpack(buffer_t* buffer, esc_all_events_slice_t** all_events_slice_p); +buffer_t* esc_read_all_forward_pack(esc_position_t* last_checkpoint, int32_t count, bool_t resolve_link_tos); +const char* get_string_for_all_events_result(int result); + +inspection_result_t* esc_write_result_unpack(buffer_t* buffer, esc_write_result_t** write_result_p); +buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version, array_t* events); +const char* get_string_for_write_result(int result); + +#endif //ESC_PROTO_HELPER_H diff --git a/src/results.c b/src/results.c new file mode 100644 index 0000000..8eec094 --- /dev/null +++ b/src/results.c @@ -0,0 +1,39 @@ +// +// Created by nicolas on 22/03/18. +// + +#include "utils/debug.h" +#include "position.h" +#include "results.h" + +void esc_recorded_event_destroy(esc_recorded_event_t* recorded_event) { + uuid_destroy(recorded_event->event_id); + free((void*)recorded_event->event_type); + free((void*)recorded_event->event_stream_id); + buffer_destroy(recorded_event->data); + buffer_destroy(recorded_event->metadata); + free(recorded_event); +} + +void esc_resolved_event_destroy(esc_resolved_event_t* resolved_event) { + if (resolved_event->original_position) esc_position_destroy(resolved_event->original_position); + if (resolved_event->event) esc_recorded_event_destroy(resolved_event->event); + if (resolved_event->link) esc_recorded_event_destroy(resolved_event->link); + free(resolved_event); +} + + +void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice) { + esc_position_destroy(all_events_slice->from_position); + esc_position_destroy(all_events_slice->next_position); + for (size_t i = 0; i < all_events_slice->n_events; i++) { + esc_resolved_event_destroy(all_events_slice->events[i]); + } + free(all_events_slice->events); + free(all_events_slice); +} + +void esc_write_result_destroy(esc_write_result_t* write_result) { + esc_position_destroy(write_result->log_position); + free(write_result); +} diff --git a/src/results.h b/src/results.h new file mode 100644 index 0000000..8bc2b33 --- /dev/null +++ b/src/results.h @@ -0,0 +1,50 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_RESULTS_H +#define ESC_RESULTS_H + +#include "utils/uuid.h" +#include "utils/buffer.h" +#include "position.h" + +struct st_recorded_event { + uuid_t* event_id; + const char* event_type; + int64_t event_number; + const char *event_stream_id; + int64_t created_epoch; + buffer_t* data; + buffer_t* metadata; +}; +typedef struct st_recorded_event esc_recorded_event_t; + +struct st_resolved_event { + esc_recorded_event_t* event; + esc_recorded_event_t* link; + esc_position_t* original_position; +}; +typedef struct st_resolved_event esc_resolved_event_t; + +struct st_all_events_slice { + char* read_direction; + esc_position_t* from_position; + esc_position_t* next_position; + size_t n_events; + esc_resolved_event_t** events; + int is_end_of_stream; +}; +typedef struct st_all_events_slice esc_all_events_slice_t; + +typedef struct st_esc_write_result_t { + int64_t next_expected_version; + esc_position_t* log_position; +} esc_write_result_t; + +void esc_recorded_event_destroy(esc_recorded_event_t* recorded_event); +void esc_resolved_event_destroy(esc_resolved_event_t* resolved_event); +void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice); +void esc_write_result_destroy(esc_write_result_t* write_result); + +#endif //ESC_RESULTS_H diff --git a/src/tcp_package.c b/src/tcp_package.c index 08125e4..e249e6c 100644 --- a/src/tcp_package.c +++ b/src/tcp_package.c @@ -16,37 +16,37 @@ const uint32_t CorrelationOffset = 2; const uint32_t AuthOffset = 18; const uint32_t MandatorySize = 18; -tcp_package_t tcp_package_create(uint8_t command, esc_uuid_t correlation_id, buffer_t data) { - tcp_package_t pkg = malloc(sizeof(struct st_tcp_package)); +tcp_package_t* tcp_package_create(uint8_t command, uuid_t* correlation_id, buffer_t* data) { + tcp_package_t* pkg = malloc(sizeof(struct st_tcp_package)); pkg->command = command; pkg->flags = 0; - pkg->correlation_id = esc_uuid_copy(correlation_id); + pkg->correlation_id = uuid_copy(correlation_id); pkg->login = 0; pkg->password = 0; pkg->data = data; return pkg; } -tcp_package_t tcp_package_create_authenticated(uint8_t command, esc_uuid_t correlation_id, buffer_t data, const char* username, const char* password) { - tcp_package_t pkg = malloc(sizeof(struct st_tcp_package)); +tcp_package_t* tcp_package_create_authenticated(uint8_t command, uuid_t* correlation_id, buffer_t* data, const char* username, const char* password) { + tcp_package_t* pkg = malloc(sizeof(struct st_tcp_package)); pkg->command = command; pkg->flags = 1; - pkg->correlation_id = esc_uuid_copy(correlation_id); + pkg->correlation_id = uuid_copy(correlation_id); pkg->login = string_copy(username); pkg->password = string_copy(password); pkg->data = data; return pkg; } -void tcp_package_destroy(tcp_package_t pkg) { +void tcp_package_destroy(tcp_package_t* pkg) { free((void*)pkg->login); free((void*)pkg->password); - esc_uuid_destroy(pkg->correlation_id); + uuid_destroy(pkg->correlation_id); buffer_destroy(pkg->data); free(pkg); } -buffer_t tcp_package_to_buffer(tcp_package_t pkg) { +buffer_t* tcp_package_to_buffer(tcp_package_t* pkg) { size_t data_size = buffer_size(pkg->data); uint8_t* buf = malloc(MandatorySize + data_size + (pkg->flags ? 257*2 : 0)); buf[CommandOffset] = pkg->command; @@ -73,10 +73,12 @@ buffer_t tcp_package_to_buffer(tcp_package_t pkg) { } fprintf(stderr, "\n"); */ - return buffer_from(buf, size); + buffer_t* res = buffer_copyfrom(buf, size); + free(buf); + return res; } -tcp_package_t tcp_package_from_buffer(buffer_t buffer) { +tcp_package_t* tcp_package_from_buffer(buffer_t* buffer) { size_t buf_size = buffer_size(buffer); uint8_t* buf_data = buffer_data(buffer); /* @@ -85,10 +87,10 @@ tcp_package_t tcp_package_from_buffer(buffer_t buffer) { } fprintf(stderr, "\n"); */ - tcp_package_t pkg = malloc(sizeof(struct st_tcp_package)); + tcp_package_t* pkg = malloc(sizeof(struct st_tcp_package)); pkg->command = buf_data[CommandOffset]; pkg->flags = buf_data[FlagsOffset]; - pkg->correlation_id = esc_uuid_from(&buf_data[CorrelationOffset], 16); + pkg->correlation_id = uuid_from(&buf_data[CorrelationOffset], 16); if (pkg->flags) { size_t l_len = buf_data[AuthOffset]; size_t p_len = buf_data[AuthOffset+1+l_len]; diff --git a/src/tcp_package.h b/src/tcp_package.h index eaf645b..77e950e 100644 --- a/src/tcp_package.h +++ b/src/tcp_package.h @@ -11,17 +11,17 @@ struct st_tcp_package { uint8_t command; uint8_t flags; - esc_uuid_t correlation_id; + uuid_t* correlation_id; const char* login; const char* password; - buffer_t data; + buffer_t* data; }; -typedef struct st_tcp_package* tcp_package_t; +typedef struct st_tcp_package tcp_package_t; -tcp_package_t tcp_package_create(uint8_t command, esc_uuid_t correlation_id, buffer_t data); -tcp_package_t tcp_package_create_authenticated(uint8_t command, esc_uuid_t correlation_id, buffer_t data, const char* username, const char* password); -void tcp_package_destroy(tcp_package_t pkg); -buffer_t tcp_package_to_buffer(tcp_package_t pkg); -tcp_package_t tcp_package_from_buffer(buffer_t buffer); +tcp_package_t* tcp_package_create(uint8_t command, uuid_t* correlation_id, buffer_t* data); +tcp_package_t* tcp_package_create_authenticated(uint8_t command, uuid_t* correlation_id, buffer_t* data, const char* username, const char* password); +void tcp_package_destroy(tcp_package_t* pkg); +buffer_t* tcp_package_to_buffer(tcp_package_t* pkg); +tcp_package_t* tcp_package_from_buffer(buffer_t* buffer); #endif //ESC_TCP_PACKAGE_H diff --git a/src/utils/array.c b/src/utils/array.c new file mode 100644 index 0000000..006ab89 --- /dev/null +++ b/src/utils/array.c @@ -0,0 +1,33 @@ +// +// Created by nicolas on 23/03/18. +// + +#include +#include "array.h" +#include "debug.h" + +array_t* array_create(size_t n, ...) { + va_list vl; + va_start(vl, n); + + array_t* arr = malloc(sizeof(array_t)); + arr->size = n; + arr->data = malloc(n * sizeof(void*)); + + for (size_t i = 0; i < n; i++) { + void* item = va_arg(vl, void*); + arr->data[i] = item; + } + + return arr; +} + +void array_destroy(array_t* array, array_deallocator destroyer) { + if (destroyer) { + for (size_t i=0;isize;i++) { + destroyer(array->data[i]); + } + } + free(array->data); + free(array); +} diff --git a/src/utils/array.h b/src/utils/array.h new file mode 100644 index 0000000..4a65462 --- /dev/null +++ b/src/utils/array.h @@ -0,0 +1,20 @@ +// +// Created by nicolas on 23/03/18. +// + +#ifndef ESC_ARRAY_H +#define ESC_ARRAY_H + +#include + +typedef struct st_array { + size_t size; + void** data; +} array_t; + +typedef void (*array_deallocator)(void*); + +array_t* array_create(size_t n, ...); +void array_destroy(array_t* array, array_deallocator destroyer); + +#endif //ESC_ARRAY_H diff --git a/src/utils/bool.h b/src/utils/bool.h new file mode 100644 index 0000000..37d1ca2 --- /dev/null +++ b/src/utils/bool.h @@ -0,0 +1,13 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_BOOL_H +#define ESC_BOOL_H + +typedef int bool_t; + +#define BOOL_TRUE 1 +#define BOOL_FALSE 0 + +#endif //ESC_BOOL_H diff --git a/src/utils/buffer.c b/src/utils/buffer.c index effd155..07bc925 100644 --- a/src/utils/buffer.c +++ b/src/utils/buffer.c @@ -13,31 +13,31 @@ struct st_buffer { uint8_t own_data; }; -buffer_t buffer_create(size_t size) { - buffer_t buf = malloc(sizeof(struct st_buffer)); +buffer_t* buffer_create(size_t size) { + buffer_t* buf = malloc(sizeof(struct st_buffer)); buf->size = size; buf->data = malloc(size); buf->own_data = 1; return buf; } -inline void buffer_destroy(buffer_t buffer) { +inline void buffer_destroy(buffer_t* buffer) { if (buffer->own_data) { free(buffer->data); } free(buffer); } -buffer_t buffer_from(const uint8_t* data, size_t size) { - buffer_t buf = malloc(sizeof(struct st_buffer)); +buffer_t* buffer_from(const uint8_t* data, size_t size) { + buffer_t* buf = malloc(sizeof(struct st_buffer)); buf->size = size; buf->data = (uint8_t*)data; buf->own_data = 0; return buf; } -buffer_t buffer_copyfrom(const uint8_t* data, size_t size) { - buffer_t buf = malloc(sizeof(struct st_buffer)); +buffer_t* buffer_copyfrom(const uint8_t* data, size_t size) { + buffer_t* buf = malloc(sizeof(struct st_buffer)); buf->size = size; buf->data = malloc(size); memcpy(buf->data, data, size); @@ -45,10 +45,18 @@ buffer_t buffer_copyfrom(const uint8_t* data, size_t size) { return buf; } -inline size_t buffer_size(buffer_t buffer) { +inline size_t buffer_size(buffer_t* buffer) { return buffer->size; } -inline uint8_t* buffer_data(buffer_t buffer) { +inline uint8_t* buffer_data(buffer_t* buffer) { return buffer->data; -} \ No newline at end of file +} + +buffer_t* buffer_from_string(const char* str) { + return buffer_copyfrom((uint8_t*)str, strlen(str)); +} + +buffer_t* buffer_copy(buffer_t* buffer) { + return buffer_copyfrom(buffer->data, buffer->size); +} diff --git a/src/utils/buffer.h b/src/utils/buffer.h index 2f3c3e8..fc960c3 100644 --- a/src/utils/buffer.h +++ b/src/utils/buffer.h @@ -8,13 +8,15 @@ #include #include -typedef struct st_buffer* buffer_t; +typedef struct st_buffer buffer_t; -buffer_t buffer_create(size_t size); -void buffer_destroy(buffer_t buffer); -buffer_t buffer_from(const uint8_t* data, size_t size); -buffer_t buffer_copyfrom(const uint8_t* data, size_t size); -size_t buffer_size(buffer_t buffer); -uint8_t* buffer_data(buffer_t buffer); +buffer_t* buffer_create(size_t size); +void buffer_destroy(buffer_t* buffer); +buffer_t* buffer_from(const uint8_t* data, size_t size); +buffer_t* buffer_from_string(const char* str); +buffer_t* buffer_copyfrom(const uint8_t* data, size_t size); +buffer_t* buffer_copy(buffer_t* other); +size_t buffer_size(buffer_t* buffer); +uint8_t* buffer_data(buffer_t* buffer); #endif //ESC_BUFFER_H diff --git a/src/utils/debug.c b/src/utils/debug.c index d6b7348..b97084f 100644 --- a/src/utils/debug.c +++ b/src/utils/debug.c @@ -4,10 +4,14 @@ #include #include +#include + void* (*_malloc)(size_t) = malloc; void (*_free)(void*) = free; #include "debug.h" +#define DBG_ALLOC_INC_SIZE 1024 + struct st_alloc { const char* file; int line; @@ -20,8 +24,9 @@ struct st_alloc* _allocations = 0; void* dbg_malloc(const char* file, int line, size_t size) { if (_allocations_count == _allocations_size) { - _allocations = _allocations_size == 0 ? _malloc(1024*sizeof(struct st_alloc)) : realloc(_allocations, (_allocations_size + 1024)*sizeof(struct st_alloc)); - _allocations_size += 1024; + _allocations = _allocations_size == 0 ? _malloc(DBG_ALLOC_INC_SIZE*sizeof(struct st_alloc)) : realloc(_allocations, (_allocations_size + DBG_ALLOC_INC_SIZE)*sizeof(struct st_alloc)); + memset(&_allocations[_allocations_size], 0, DBG_ALLOC_INC_SIZE * sizeof(struct st_alloc)); + _allocations_size += DBG_ALLOC_INC_SIZE; } void* ptr = _malloc(size); struct st_alloc* alloc = &_allocations[_allocations_count++]; @@ -33,20 +38,24 @@ void* dbg_malloc(const char* file, int line, size_t size) { } void dbg_free(void* ptr) { + struct st_alloc* found = 0; for (size_t i = 0; i < _allocations_count; i++) { - if (_allocations[i].ptr == ptr) _allocations[i].ptr = 0; + if (_allocations[i].ptr == ptr) found = &_allocations[i]; } + if (found) found->ptr = 0; + _free(ptr); } - void dbg_list_allocs() { size_t total = 0; + size_t count = 0; printf("Memory leaks:\n"); for (size_t i = 0; i < _allocations_count; i++) { if (_allocations[i].ptr == 0) continue; printf("%lu %p (%lu) @ %s:%d\n", i, _allocations[i].ptr, _allocations[i].size, _allocations[i].file, _allocations[i].line); total += _allocations[i].size; + count++; } - printf("Total memory leaked: %lu\n", total); -} \ No newline at end of file + printf("Totals: nb=%lu memory=%lu\n", count, total); +} diff --git a/src/utils/error.c b/src/utils/error.c new file mode 100644 index 0000000..641c23e --- /dev/null +++ b/src/utils/error.c @@ -0,0 +1,29 @@ +// +// Created by nicolas on 22/03/18. +// + +#include +#include +#include +#include +#include "error.h" +#include "debug.h" + +error_t* _error_create(const char* file, int line, int code, char* format, ...) { + va_list vl; + va_start(vl, format); + error_t* err = malloc(sizeof(struct st_error)); + assert(err != 0); + err->file = file; + err->line = line; + err->code = code; + vsnprintf(err->message, ERROR_MSG_SIZE, format, vl); + return err; +} +#define error_create(code, fmt, args...) _error_create(__FILE__, __LINE__, code, fmt, args) + +error_t* error_copy(error_t* err) { + error_t* copy = malloc(sizeof(error_t)); + memcpy(copy, err, sizeof(error_t)); + return copy; +} \ No newline at end of file diff --git a/src/utils/error.h b/src/utils/error.h new file mode 100644 index 0000000..41ff98f --- /dev/null +++ b/src/utils/error.h @@ -0,0 +1,21 @@ +// +// Created by nicolas on 22/03/18. +// + +#ifndef ESC_ERROR_H +#define ESC_ERROR_H + +#define ERROR_MSG_SIZE 1024 +struct st_error { + const char* file; + int line; + int code; + char message[ERROR_MSG_SIZE]; +}; +typedef struct st_error error_t; + +error_t* _error_create(const char* file, int line, int code, char* format, ...); +#define error_create(code, fmt, args...) _error_create(__FILE__, __LINE__, code, fmt, args) +error_t* error_copy(error_t* error); + +#endif //ESC_ERROR_H diff --git a/src/utils/event.c b/src/utils/event.c new file mode 100644 index 0000000..333eeba --- /dev/null +++ b/src/utils/event.c @@ -0,0 +1,37 @@ +// +// Created by nicolas on 23/03/18. +// + +#include "event.h" +#include "debug.h" + +#ifdef __linux__ +#include + +struct st_event { + pthread_mutex_t mutex; + pthread_cond_t handle; +}; + +event_t* event_create() { + event_t* event = malloc(sizeof(event_t)); + pthread_mutex_init(&event->mutex, 0); + pthread_cond_init(&event->handle, 0); + return event; +} + +void event_destroy(event_t* event) { + pthread_mutex_destroy(&event->mutex); + pthread_cond_destroy(&event->handle); + free(event); +} + +void event_wait(event_t* event) { + pthread_cond_wait(&event->handle, &event->mutex); +} + +void event_set(event_t* event) { + pthread_cond_broadcast(&event->handle); +} + +#endif \ No newline at end of file diff --git a/src/utils/event.h b/src/utils/event.h new file mode 100644 index 0000000..de84e3b --- /dev/null +++ b/src/utils/event.h @@ -0,0 +1,15 @@ +// +// Created by nicolas on 23/03/18. +// + +#ifndef ESC_EVENT_H +#define ESC_EVENT_H + +typedef struct st_event event_t; + +event_t* event_create(); +void event_destroy(event_t* event); +void event_wait(event_t* event); +void event_set(event_t* event); + +#endif //ESC_EVENT_H diff --git a/src/utils/mutex.c b/src/utils/mutex.c index fcd7146..a15ffb0 100644 --- a/src/utils/mutex.c +++ b/src/utils/mutex.c @@ -16,21 +16,21 @@ struct st_mutex { CRITICAL_SECTION handle; }; -mutex_t mutex_create() { - mutex_t mutex = malloc(sizeof(struct st_mutex)); +mutex_t* mutex_create() { + mutex_t* mutex = malloc(sizeof(struct st_mutex)); InitializeCriticalSection(&mutex->handle); return mutex; } -void mutex_lock(mutex_t mutex) { +void mutex_lock(mutex_t* mutex) { EnterCriticalSection(&mutex->handle); } -void mutex_unlock(mutex_t mutex) { +void mutex_unlock(mutex_t* mutex) { LeaveCriticalSection(&mutex->handle); } -void mutex_destroy(mutex_t mutex) { +void mutex_destroy(mutex_t* mutex) { DeleteCriticalSection(&mutex->handle); } #endif @@ -41,22 +41,23 @@ struct st_mutex { pthread_mutex_t handle; }; -mutex_t mutex_create() { - mutex_t mutex = malloc(sizeof(struct st_mutex)); +mutex_t* mutex_create() { + mutex_t* mutex = malloc(sizeof(struct st_mutex)); pthread_mutex_init(&mutex->handle, 0); return mutex; } -void mutex_lock(mutex_t mutex) { +void mutex_lock(mutex_t* mutex) { pthread_mutex_lock(&mutex->handle); } -void mutex_unlock(mutex_t mutex) { +void mutex_unlock(mutex_t* mutex) { pthread_mutex_unlock(&mutex->handle); } -void mutex_destroy(mutex_t mutex) { +void mutex_destroy(mutex_t* mutex) { pthread_mutex_destroy(&mutex->handle); free(mutex); } + #endif \ No newline at end of file diff --git a/src/utils/mutex.h b/src/utils/mutex.h index 70642d7..152f4ca 100644 --- a/src/utils/mutex.h +++ b/src/utils/mutex.h @@ -5,11 +5,11 @@ #ifndef ESC_MUTEX_H #define ESC_MUTEX_H -typedef struct st_mutex* mutex_t; +typedef struct st_mutex mutex_t; -mutex_t mutex_create(); -void mutex_lock(mutex_t mutex); -void mutex_unlock(mutex_t mutex); -void mutex_destroy(mutex_t mutex); +mutex_t* mutex_create(); +void mutex_lock(mutex_t* mutex); +void mutex_unlock(mutex_t* mutex); +void mutex_destroy(mutex_t* mutex); #endif //ESC_MUTEX_H diff --git a/src/utils/queue.c b/src/utils/queue.c index b4fa43e..e2ecafa 100644 --- a/src/utils/queue.c +++ b/src/utils/queue.c @@ -14,14 +14,14 @@ struct st_node { struct st_node* next; }; struct st_queue { - mutex_t lock; + mutex_t* lock; size_t size; struct st_node* start; struct st_node* end; }; -queue_t queue_create() { - queue_t q = malloc(sizeof(struct st_queue)); +queue_t* queue_create() { + queue_t* q = malloc(sizeof(struct st_queue)); assert(q != 0); q->lock = mutex_create(); q->size = 0; @@ -30,7 +30,7 @@ queue_t queue_create() { return q; } -void queue_enqueue(queue_t q, void* item) { +void queue_enqueue(queue_t* q, void* item) { struct st_node* node = malloc(sizeof(struct st_node)); assert(node != 0); node->item = item; @@ -48,7 +48,7 @@ void queue_enqueue(queue_t q, void* item) { mutex_unlock(q->lock); } -void* queue_dequeue(queue_t q) { +void* queue_dequeue(queue_t* q) { mutex_lock(q->lock); if (q->start == 0) { mutex_unlock(q->lock); @@ -66,21 +66,21 @@ void* queue_dequeue(queue_t q) { return item; } -void* queue_peek(queue_t q) { +void* queue_peek(queue_t* q) { mutex_lock(q->lock); void* item = q->start ? q->start->item : 0; mutex_unlock(q->lock); return item; } -inline size_t queue_size(queue_t q) { +inline size_t queue_size(queue_t* q) { mutex_lock(q->lock); size_t size = q->size; mutex_unlock(q->lock); return size; } -void queue_destroy(queue_t q) { +void queue_destroy(queue_t* q) { mutex_lock(q->lock); struct st_node* p = q->start; while(p) { @@ -89,5 +89,32 @@ void queue_destroy(queue_t q) { p = next; } mutex_unlock(q->lock); + mutex_destroy(q->lock); free(q); } + +void* queue_remove(queue_t* q, find_predicate predicate, void* arg) { + void* found = 0; + mutex_lock(q->lock); + struct st_node* p = q->start; + struct st_node* prev = 0; + while(p) { + if (predicate(p->item, arg) == BOOL_TRUE) { + found = p->item; + if (prev) { + prev->next = p->next; + } else { + q->start = p->next; + } + if (q->end == p) { + q->end = prev; + } + free(p); + break; + } + prev = p; + p = p->next; + } + mutex_unlock(q->lock); + return found; +} \ No newline at end of file diff --git a/src/utils/queue.h b/src/utils/queue.h index 30413bf..b525ef3 100644 --- a/src/utils/queue.h +++ b/src/utils/queue.h @@ -5,13 +5,18 @@ #ifndef ESC_QUEUE_H #define ESC_QUEUE_H -typedef struct st_queue* queue_t; +#include "bool.h" -queue_t queue_create(); -void queue_enqueue(queue_t q, void* item); -void* queue_dequeue(queue_t q); -void* queue_peek(queue_t q); -size_t queue_size(queue_t q); -void queue_destroy(queue_t q); +typedef struct st_queue queue_t; + +queue_t* queue_create(); +void queue_destroy(queue_t* q); +void queue_enqueue(queue_t* q, void* item); +void* queue_dequeue(queue_t* q); +void* queue_peek(queue_t* q); +size_t queue_size(queue_t* q); + +typedef bool_t (*find_predicate)(void* item, void* arg); +void* queue_remove(queue_t* q, find_predicate, void* arg); #endif //ESC_QUEUE_H diff --git a/src/utils/socket.c b/src/utils/socket.c index fa708f0..9526321 100644 --- a/src/utils/socket.c +++ b/src/utils/socket.c @@ -17,9 +17,9 @@ struct st_socket { int last_error; }; -socket_t socket_create(int type) { +socket_t* socket_create(int type) { if (type == SOCKET_TYPE_TCP) { - socket_t s = malloc(sizeof(struct st_socket)); + socket_t* s = malloc(sizeof(struct st_socket)); s->af = AF_INET; s->type = SOCK_STREAM; s->proto = IPPROTO_TCP; @@ -32,7 +32,7 @@ socket_t socket_create(int type) { return NULL; } -int socket_close(socket_t s) { +int socket_close(socket_t* s) { int rc = closesocket(s->s); if (rc != 0) { s->last_error = WSAGetLastError(); @@ -40,7 +40,7 @@ int socket_close(socket_t s) { return rc; } -int socket_connect(socket_t s, char* addr, unsigned short port) { +int socket_connect(socket_t* s, char* addr, unsigned short port) { char port_s[6]; ADDRINFO hints; memset(&hints, 0, sizeof(ADDRINFO)); @@ -60,7 +60,7 @@ int socket_connect(socket_t s, char* addr, unsigned short port) { return 0; } -ssize_t socket_send(socket_t s, char* data, size_t len) { +ssize_t socket_send(socket_t* s, char* data, size_t len) { ssize_t rc = send(s->s, data, (int)len, 0); if (rc < 0) { s->last_error = WSAGetLastError(); @@ -68,7 +68,7 @@ ssize_t socket_send(socket_t s, char* data, size_t len) { return rc; } -ssize_t socket_recv(socket_t s, char* buf, size_t len) { +ssize_t socket_recv(socket_t* s, char* buf, size_t len) { ssize_t rc = recv(s->s, buf, (int)len, 0); if (rc < 0) { s->last_error = WSAGetLastError(); @@ -76,7 +76,7 @@ ssize_t socket_recv(socket_t s, char* buf, size_t len) { return rc; } -int socket_readable(socket_t s) { +int socket_readable(socket_t* s) { fd_set readable; FD_ZERO(&readable); FD_SET(s->s, &readable); @@ -89,7 +89,7 @@ int socket_readable(socket_t s) { return rc; } -int socket_writable(socket_t s) { +int socket_writable(socket_t* s) { fd_set writable; FD_ZERO(&writable); FD_SET(s->s, &writable); @@ -102,7 +102,7 @@ int socket_writable(socket_t s) { return rc; } -int socket_error(socket_t s) { +int socket_error(socket_t* s) { return s->last_error; } #endif @@ -122,9 +122,9 @@ struct st_socket { int last_error; }; -socket_t socket_create(int type) { +socket_t* socket_create(int type) { if (type == SOCKET_TYPE_TCP) { - socket_t s = malloc(sizeof(struct st_socket)); + socket_t* s = malloc(sizeof(struct st_socket)); s->af = AF_INET; s->type = SOCK_STREAM; s->proto = IPPROTO_TCP; @@ -137,12 +137,12 @@ socket_t socket_create(int type) { return NULL; } -inline void socket_destroy(socket_t s) { +inline void socket_destroy(socket_t* s) { socket_close(s); free(s); } -int socket_close(socket_t s) { +int socket_close(socket_t* s) { if (close(s->s) == -1) { s->last_error = errno; return -1; @@ -150,7 +150,7 @@ int socket_close(socket_t s) { return 0; } -int socket_connect(socket_t s, char* addr, unsigned short port) { +int socket_connect(socket_t* s, char* addr, unsigned short port) { struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = s->af; @@ -161,17 +161,20 @@ int socket_connect(socket_t s, char* addr, unsigned short port) { snprintf(port_s, 6, "%d", port); int rc; if ((rc = getaddrinfo(addr, port_s, &hints, &result)) != 0) { + if (result) freeaddrinfo(result); s->last_error = rc; return -1; } if (connect(s->s, result->ai_addr, (int)result->ai_addrlen) != 0) { + freeaddrinfo(result); s->last_error = errno; return -1; } + freeaddrinfo(result); return 0; } -ssize_t socket_send(socket_t s, char* data, size_t len) { +ssize_t socket_send(socket_t* s, char* data, size_t len) { ssize_t rc = send(s->s, data, len, 0); if (rc < 0) { s->last_error = errno; @@ -179,7 +182,7 @@ ssize_t socket_send(socket_t s, char* data, size_t len) { return rc; } -ssize_t socket_recv(socket_t s, char* buf, size_t len) { +ssize_t socket_recv(socket_t* s, char* buf, size_t len) { ssize_t rc = recv(s->s, buf, len, 0); if (rc < 0) { s->last_error = errno; @@ -187,7 +190,7 @@ ssize_t socket_recv(socket_t s, char* buf, size_t len) { return rc; } -int socket_writable(socket_t s) { +int socket_writable(socket_t* s) { fd_set writable; FD_ZERO(&writable); FD_SET(s->s, &writable); @@ -200,7 +203,7 @@ int socket_writable(socket_t s) { return rc; } -int socket_readable(socket_t s) { +int socket_readable(socket_t* s) { fd_set readable; FD_ZERO(&readable); FD_SET(s->s, &readable); @@ -213,7 +216,7 @@ int socket_readable(socket_t s) { return rc; } -inline int socket_error(socket_t s) { +inline int socket_error(socket_t* s) { return s->last_error; } #endif \ No newline at end of file diff --git a/src/utils/socket.h b/src/utils/socket.h index 55aa59e..7957c7c 100644 --- a/src/utils/socket.h +++ b/src/utils/socket.h @@ -6,21 +6,21 @@ #define ESC_SOCKET_H struct st_socket; -typedef struct st_socket* socket_t; +typedef struct st_socket socket_t; enum { SOCKET_TYPE_TCP = 1, SOCKET_TYPE_UDP = 2 }; -socket_t socket_create(int type); -void socket_destroy(socket_t s); -int socket_close(socket_t s); -int socket_connect(socket_t s, char* addr, unsigned short port); -ssize_t socket_send(socket_t s, char* data, size_t len); -ssize_t socket_recv(socket_t s, char* buf, size_t len); -int socket_writable(socket_t s); -int socket_readable(socket_t s); -int socket_error(socket_t s); +socket_t* socket_create(int type); +void socket_destroy(socket_t* s); +int socket_close(socket_t* s); +int socket_connect(socket_t* s, char* addr, unsigned short port); +ssize_t socket_send(socket_t* s, char* data, size_t len); +ssize_t socket_recv(socket_t* s, char* buf, size_t len); +int socket_writable(socket_t* s); +int socket_readable(socket_t* s); +int socket_error(socket_t* s); #endif //ESC_SOCKET_H diff --git a/src/utils/thread.c b/src/utils/thread.c index 642d70b..20a557f 100644 --- a/src/utils/thread.c +++ b/src/utils/thread.c @@ -17,8 +17,8 @@ struct st_thread { int last_error; }; -thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { - thread_t thread = malloc(sizeof(struct st_thread)); +thread_t* thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { + thread_t* thread = malloc(sizeof(struct st_thread)); thread->handle = 0; thread->start = thread_start; thread->arg = thread_arg; @@ -26,7 +26,7 @@ thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { return thread; } -int thread_start(thread_t thread) { +int thread_start(thread_t* thread) { thread->handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)thread->start, thread->arg, 0, &thread->id); if (thread->handle == NULL) { DWORD rc = GetLastError(); @@ -36,7 +36,7 @@ int thread_start(thread_t thread) { return 0; } -int thread_wait(thread_t thread) { +int thread_wait(thread_t* thread) { if (WaitForSingleObject(thread->handle, INFINITE) == WAIT_FAILED) { DWORD rc = GetLastError(); thread->last_error = (int)rc; @@ -45,13 +45,13 @@ int thread_wait(thread_t thread) { return 0; } -int thread_destroy(thread_t thread) { +int thread_destroy(thread_t* thread) { if (thread_wait(thread) != 0) SuspendThread(thread->handle); CloseHandle(thread->handle); free(thread); } -int thread_error(thread_t thread) { +int thread_error(thread_t* thread) { return thread->last_error; } @@ -68,8 +68,8 @@ struct st_thread { int joined; }; -thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { - thread_t thread = malloc(sizeof(struct st_thread)); +thread_t* thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { + thread_t* thread = malloc(sizeof(struct st_thread)); thread->handle = 0; thread->start = thread_start; thread->arg = thread_arg; @@ -79,7 +79,7 @@ thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { return thread; } -int thread_start(thread_t thread) { +int thread_start(thread_t* thread) { int rc = pthread_create(&thread->handle, NULL, thread->start, thread->arg); if (rc != 0) { thread->last_error = rc; @@ -88,7 +88,7 @@ int thread_start(thread_t thread) { return 0; } -int thread_wait(thread_t thread) { +int thread_wait(thread_t* thread) { if (thread->joined) { return 0; } @@ -102,7 +102,7 @@ int thread_wait(thread_t thread) { return 0; } -int thread_destroy(thread_t thread) { +int thread_destroy(thread_t* thread) { if (thread_wait(thread)) { int rc = pthread_cancel(thread->handle); if (rc != 0) { @@ -114,11 +114,11 @@ int thread_destroy(thread_t thread) { return 0; } -inline int thread_error(thread_t thread) { +inline int thread_error(thread_t* thread) { return thread->last_error; } -inline int thread_result(thread_t thread, void** result) { +inline int thread_result(thread_t* thread, void** result) { if (thread_wait(thread)) { return -1; } diff --git a/src/utils/thread.h b/src/utils/thread.h index d07e9ed..3d82772 100644 --- a/src/utils/thread.h +++ b/src/utils/thread.h @@ -7,13 +7,13 @@ typedef void* (*thread_start_t)(void*); typedef void* thread_arg_t; -typedef struct st_thread* thread_t; +typedef struct st_thread thread_t; -thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg); -int thread_start(thread_t thread); -int thread_wait(thread_t thread); -int thread_destroy(thread_t thread); -int thread_error(thread_t thread); -int thread_result(thread_t thread, void** result); +thread_t* thread_create(thread_start_t thread_start, thread_arg_t thread_arg); +int thread_start(thread_t* thread); +int thread_wait(thread_t* thread); +int thread_destroy(thread_t* thread); +int thread_error(thread_t* thread); +int thread_result(thread_t* thread, void** result); #endif //ESC_THREAD_H diff --git a/src/utils/uuid.c b/src/utils/uuid.c index 0dd0e19..e8d7c05 100644 --- a/src/utils/uuid.c +++ b/src/utils/uuid.c @@ -18,39 +18,39 @@ struct st_uuid { }; //TODO: fixme - this is not secure, use crypto random bytes instead of rand -esc_uuid_t esc_uuid_create() { +uuid_t* uuid_create() { if (srand_called == 0) { srand(time(NULL)); srand_called = 1; } - esc_uuid_t result = malloc(sizeof(struct st_uuid)); + uuid_t* result = malloc(sizeof(struct st_uuid)); for(int i=0;idata[i] = (uint8_t)(rand() & 0xff); return result; } -inline void esc_uuid_destroy(esc_uuid_t uuid) { +inline void uuid_destroy(uuid_t* uuid) { free(uuid); } -esc_uuid_t esc_uuid_from(const uint8_t* src, size_t size) { +uuid_t* uuid_from(const uint8_t *src, size_t size) { if (size < UUID_SIZE) { return 0; } - esc_uuid_t uuid = malloc(sizeof(struct st_uuid)); + uuid_t* uuid = malloc(sizeof(struct st_uuid)); memcpy(uuid->data, src, UUID_SIZE); return uuid; } -esc_uuid_t esc_uuid_copy(esc_uuid_t other) { - return esc_uuid_from((uint8_t*)other->data, UUID_SIZE); +uuid_t* uuid_copy(uuid_t* other) { + return uuid_from((uint8_t *) other->data, UUID_SIZE); } -int esc_uuid_compare(esc_uuid_t left, esc_uuid_t right) { +int uuid_compare(uuid_t* left, uuid_t* right) { return memcmp(left->data, right->data, UUID_SIZE); } -const char* esc_uuid_format(esc_uuid_t uuid, char* buffer, size_t buf_size) { +const char* uuid_format(uuid_t* uuid, char *buffer, size_t buf_size) { #ifdef _WIN32 snprintf(buffer, buf_size, "%08lx-%04hx-%04hx-%04hx-%08lx%04hx", *(long*)&uuid->data[0], diff --git a/src/utils/uuid.h b/src/utils/uuid.h index 8fcaf87..be10b60 100644 --- a/src/utils/uuid.h +++ b/src/utils/uuid.h @@ -5,15 +5,16 @@ #ifndef ESC_UUID_H #define ESC_UUID_H -#include +#include //uint8_t +#include //size_t -typedef struct st_uuid* esc_uuid_t; +typedef struct st_uuid uuid_t; -esc_uuid_t esc_uuid_create(); -void esc_uuid_destroy(esc_uuid_t uuid); -esc_uuid_t esc_uuid_from(const uint8_t* src, size_t size); -esc_uuid_t esc_uuid_copy(esc_uuid_t other); -int esc_uuid_compare(esc_uuid_t left, esc_uuid_t right); -const char* esc_uuid_format(esc_uuid_t uuid, char* buffer, size_t len); +uuid_t* uuid_create(); +void uuid_destroy(uuid_t* uuid); +uuid_t* uuid_from(const uint8_t *src, size_t size); +uuid_t* uuid_copy(uuid_t* other); +int uuid_compare(uuid_t* left, uuid_t* right); +const char* uuid_format(uuid_t* uuid, char *buffer, size_t len); #endif //ESC_UUID_H diff --git a/test/main.c b/test/main.c new file mode 100644 index 0000000..b143182 --- /dev/null +++ b/test/main.c @@ -0,0 +1,89 @@ +#include +#include "../src/utils/debug.h" +#include "../include/esc.h" + +#ifdef _WIN32 +#include +#include +#define usleep Sleep +#define sleep(x) Sleep(x*1000) +#endif +#ifdef __linux__ +#include +#endif + +int main() { +#ifdef _WIN32 + WSADATA wsaData; + WSAStartup(MAKEWORD(2,0), &wsaData); +#endif + + esc_connection_t* conn = esc_connection_create(esc_default_connection_settings, "tcp://127.0.0.1:1113", NULL); + if (esc_connection_connect(conn) != 0) { + error_t* err = esc_connection_last_error(conn); + fprintf(stderr, "Error: %s code=%d file=%s line=%d\n", err->message, err->code, err->file, err->line); + return -2; + } + + uuid_t *event_id = uuid_create(); + buffer_t *data = buffer_from_string("{\"a\":\"1\"}"); + esc_event_data_t* event_data = esc_event_data_create(event_id, "test_event", BOOL_TRUE, data, 0); + array_t* events = array_create(1, event_data); + esc_write_result_t* write_result = esc_append_to_stream(conn, "test-123", ESC_VERSION_ANY, events); + if (write_result == 0) { + error_t* err = esc_connection_last_error(conn); + fprintf(stderr, "Error: %s code=%d file=%s line=%d\n", err->message, err->code, err->file, err->line); + return -3; + } + esc_write_result_destroy(write_result); + array_destroy(events, (array_deallocator)esc_event_data_destroy); + uuid_destroy(event_id); + buffer_destroy(data); + + esc_credentials_t* credentials = esc_credentials_create("admin", "changeit"); + + esc_all_events_slice_t* result = 0; + do { + esc_all_events_slice_t* old_result = result; + result = esc_connection_read_all_forward(conn, old_result ? old_result->next_position : NULL, 100, credentials); + if (old_result) esc_all_events_slice_destroy(old_result); + if (result == 0) { + error_t* err = esc_connection_last_error(conn); + fprintf(stderr, "Error: %s code=%d file=%s line=%d\n", err->message, err->code, err->file, err->line); + break; + } + char posbuf1[44]; + char posbuf2[44]; + printf("%s %s %s %u\n", result->read_direction, + esc_position_format(result->from_position, posbuf1, 44), + esc_position_format(result->next_position, posbuf2, 44), + result->is_end_of_stream); + char uuid_buf[37]; + for (size_t i = 0; i < result->n_events; i++) { +#ifdef _WIN32 + printf("%s %s %lld@%s %llu %llu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), +#else + printf("%s %s %ld@%s %lu %lu\n", uuid_format(result->events[i]->event->event_id, uuid_buf, 37), +#endif + result->events[i]->event->event_type, + result->events[i]->event->event_number, + result->events[i]->event->event_stream_id, + buffer_size(result->events[i]->event->data), + buffer_size(result->events[i]->event->metadata)); + } + } while(result->is_end_of_stream == 0); + + if (result) esc_all_events_slice_destroy(result); + + sleep(5); + + esc_connection_destroy(conn); + esc_credentials_destroy(credentials); + + dbg_list_allocs(); + +#ifdef _WIN32 + WSACleanup(); +#endif + return 0; +}