diff --git a/src/esc.c b/src/esc.c index 4c7a5a8..2309d81 100644 --- a/src/esc.c +++ b/src/esc.c @@ -28,17 +28,17 @@ typedef int bool_t; const bool_t BOOL_TRUE = 1; const bool_t BOOL_FALSE = 0; -#define ERROR_BUFFER_SIZE 4096 -esc_error_t error_create(int code, char* format, ...) { +const esc_error_t _error_create(const char* file, int line, int code, char* format, ...) { va_list vl; va_start(vl, format); - char buf[ERROR_BUFFER_SIZE]; - size_t size = (size_t)snprintf(buf, ERROR_BUFFER_SIZE, format, vl); - char* msg = malloc(size+1); - strcpy(msg, buf); - esc_error_t res = {code, msg}; - return res; + esc_error_t err; + err.file = file; + err.line = line; + err.code = code; + vsnprintf(err.message, ERROR_MSG_SIZE, format, vl); + return err; } +#define error_create(code, fmt, args...) _error_create(__FILE__, __LINE__, code, fmt, args) struct st_connection_settings { bool_t use_ssl_connection; @@ -75,6 +75,8 @@ struct st_connection { mutex_t recv_peek_lock; queue_t send_queue; queue_t recv_queue; + bool_t has_error; + esc_error_t last_error; }; struct st_static_endpoint_discoverer { @@ -183,25 +185,8 @@ void protobuf_c_free(void *alloc_data, void* p) { } esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { - if (strncmp(addr, "tcp://", 6) != 0) { - return 0; - } - char* pos = strrchr(addr, ':'); - if (pos == 0) { - return 0; - } - char* host = malloc(pos - addr - 5); - strncpy(host, addr+6, pos-addr-6); - host[pos-addr-6] = 0; - unsigned short port = (unsigned short)atoi(pos+1); - static_endpoint_discoverer_t discover_data = { - {host, port}, connection_settings->use_ssl_connection - }; struct st_connection* conn = malloc(sizeof(struct st_connection)); conn->settings = *connection_settings; - conn->discoverer_data = malloc(sizeof(struct st_static_endpoint_discoverer)); - memcpy(conn->discoverer_data, &discover_data, sizeof(struct st_static_endpoint_discoverer)); - conn->discover = (endpoint_discoverer_t)static_discover; conn->protobuf_c_allocator.alloc = protobuf_c_alloc; conn->protobuf_c_allocator.free = protobuf_c_free; conn->protobuf_c_allocator.allocator_data = 0; @@ -216,8 +201,35 @@ esc_connection_t esc_connection_create(const esc_connection_settings_t* connecti } conn->send_queue = queue_create(); conn->recv_peek_lock = mutex_create(); - conn->recv_queue = queue_create(); - conn->stop = 0; + conn->recv_queue = queue_create(); + conn->stop = 0; + conn->has_error = 0; + memset(&conn->last_error, 0, sizeof(struct st_error)); + + if (strncmp(addr, "tcp://", 6) != 0) { + conn->has_error = 1; + conn->last_error = error_create(1, "invalid schema for address: %s", addr); + return conn; + } + char* pos = strrchr(addr, ':'); + if (pos == 0) { + conn->has_error = 1; + conn->last_error = error_create(2, "missing port in address: %s", addr); + return conn; + } + char* host = malloc(pos - addr - 5); + strncpy(host, addr+6, pos-addr-6); + host[pos-addr-6] = 0; + unsigned short port = (unsigned short)atoi(pos+1); + static_endpoint_discoverer_t discover_data = { + {host, port}, connection_settings->use_ssl_connection + }; + + conn->discoverer_data = malloc(sizeof(struct st_static_endpoint_discoverer)); + memcpy(conn->discoverer_data, &discover_data, sizeof(struct st_static_endpoint_discoverer)); + conn->discover = (endpoint_discoverer_t)static_discover; + + return conn; } @@ -226,20 +238,28 @@ void* connection_thread(void* arg) { while(conn->stop == BOOL_FALSE) { if (socket_writable(conn->tcp_conn)) { + printf("T1"); tcp_package_t send_pkg = queue_dequeue(conn->send_queue); + printf("T1(%p)", send_pkg); if (send_pkg && connection_send_tcp_package(conn, send_pkg)) { fprintf(stderr, "failed to send pkg."); } + printf("T2"); } if (socket_readable(conn->tcp_conn)) { + printf("T3"); tcp_package_t recv_pkg = connection_recv_tcp_package(conn); + printf("T4"); if (recv_pkg && recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); + printf("T5"); queue_enqueue(conn->send_queue, heartbeat_pkg); } else if (recv_pkg) { + printf("T6"); queue_enqueue(conn->recv_queue, recv_pkg); } + printf("T7"); } usleep(1); @@ -258,12 +278,14 @@ int esc_connection_connect(esc_connection_t conn) { conn->tcp_conn = socket_create(SOCKET_TYPE_TCP); tcp_endpoint_t endpoint = conn->settings.use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint; if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) { - return -2; + conn->last_error = error_create(socket_error(conn->tcp_conn), "can't connect to %s:%d", endpoint.host, endpoint.port); + return -1; } // Start manager thread conn->manager_thread = thread_create(connection_thread, conn); if (thread_start(conn->manager_thread)) { - return -3; + conn->last_error = error_create(thread_error(conn->manager_thread), "can't start manager thread.", ""); + return -1; } // Identify // build message @@ -280,7 +302,8 @@ int esc_connection_connect(esc_connection_t conn) { tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) { - return -5; + conn->last_error = error_create(0, "server error: %d.", recv_pkg->command); + return -1; } return 0; @@ -288,8 +311,8 @@ int esc_connection_connect(esc_connection_t conn) { esc_credentials_t esc_credentials_create(const char* username, const char* password) { esc_credentials_t creds = malloc(sizeof(struct st_credentials)); - creds->username = username; - creds->password = password; + creds->username = string_copy(username); + creds->password = string_copy(password); return creds; } @@ -317,6 +340,7 @@ esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__Resolv } esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t credentials) { + //TODO function for packing from client struct direct to protobuf data EventStore__Client__Messages__ReadAllEvents send_msg; event_store__client__messages__read_all_events__init(&send_msg); send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0; @@ -327,14 +351,23 @@ esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, co size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); uint8_t buffer[s]; event_store__client__messages__read_all_events__pack(&send_msg, buffer); - tcp_package_t send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); + printf("A %s\n", credentials->password); + tcp_package_t send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); + printf("A1 %s %s\n", credentials->password, send_pkg->password); queue_enqueue(conn->send_queue, send_pkg); + printf("A2 %s %s\n", credentials->password, send_pkg->password); + + printf("B %s\n", credentials->password); tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); - if (recv_pkg->command != 0xB7) { + if (recv_pkg->command != MESSAGE_READALLEVENTSFORWARDCOMPLETED) { + conn->last_error = error_create(recv_pkg->command, "server error: %s", get_string_for_tcp_message(recv_pkg->command)); return 0; } + printf("C %s\n", credentials->password); + + //TODO function for unpacking from protobuf data to client struct size_t data_size = buffer_size(recv_pkg->data); uint8_t* data = buffer_data(recv_pkg->data); @@ -355,6 +388,8 @@ esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, co } event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); + printf("D %s\n", credentials->password); + return result; } @@ -362,6 +397,10 @@ void esc_connection_close(esc_connection_t conn) { socket_close(conn->tcp_conn); } +inline esc_error_t esc_connection_last_error(esc_connection_t conn) { + return conn->last_error; +} + const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { #ifdef _WIN32 snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); diff --git a/src/esc.h b/src/esc.h index 5e17cab..7b4156c 100644 --- a/src/esc.h +++ b/src/esc.h @@ -8,9 +8,12 @@ #include "utils/uuid.h" #include "utils/buffer.h" +#define ERROR_MSG_SIZE 1024 struct st_error { - const int code; - const char* message; + const char* file; + int line; + int code; + char message[ERROR_MSG_SIZE]; }; typedef struct st_error esc_error_t; @@ -69,6 +72,8 @@ esc_credentials_t esc_credentials_create(const char* username, const char* passw esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t credentials); void esc_connection_close(esc_connection_t conn); +esc_error_t esc_connection_last_error(esc_connection_t conn); + // Utils const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size); diff --git a/src/main.c b/src/main.c index 30c95b1..c38cdd8 100644 --- a/src/main.c +++ b/src/main.c @@ -17,10 +17,9 @@ int main() { #endif esc_connection_t conn = esc_connection_create(esc_default_connection_settings, "tcp://127.0.0.1:1113", NULL); - if (conn == 0) { - return -1; - } if (esc_connection_connect(conn) != 0) { + esc_error_t err = esc_connection_last_error(conn); + fprintf(stderr, "Error: %s code=%d file=%s line=%d", err.message, err.code, err.file, err.line); return -2; } esc_credentials_t credentials = esc_credentials_create("admin", "changeit"); @@ -29,6 +28,8 @@ int main() { do { result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 100, credentials); if (result == 0) { + esc_error_t err = esc_connection_last_error(conn); + fprintf(stderr, "Error: %s code=%d file=%s line=%d", err.message, err.code, err.file, err.line); return -3; } char posbuf1[44]; diff --git a/src/tcp_package.c b/src/tcp_package.c index 6ca516c..d7f8eb7 100644 --- a/src/tcp_package.c +++ b/src/tcp_package.c @@ -7,6 +7,7 @@ #include #include #include "tcp_package.h" +#include "utils/string.h" const uint32_t CommandOffset = 0; const uint32_t FlagsOffset = 1; @@ -30,13 +31,15 @@ tcp_package_t tcp_package_create_authenticated(uint8_t command, esc_uuid_t corre pkg->command = command; pkg->flags = 1; pkg->correlation_id = esc_uuid_copy(correlation_id); - pkg->login = username; - pkg->password = password; + pkg->login = string_copy(username); + pkg->password = string_copy(password); pkg->data = data; return pkg; } void tcp_package_destroy(tcp_package_t pkg) { + free((void*)pkg->login); + free((void*)pkg->password); esc_uuid_destroy(pkg->correlation_id); buffer_destroy(pkg->data); free(pkg); diff --git a/src/utils/mutex.c b/src/utils/mutex.c index 7801dcc..d2f9879 100644 --- a/src/utils/mutex.c +++ b/src/utils/mutex.c @@ -10,22 +10,27 @@ #include "mutex.h" #ifdef _WIN32 +#include +struct st_mutex { + CRITICAL_SECTION handle; +}; + mutex_t mutex_create() { - mutex_t mutex = malloc(sizeof(CRITICAL_SECTION)); - InitializeCriticalSection(mutex); + mutex_t mutex = malloc(sizeof(struct st_mutex)); + InitializeCriticalSection(&mutex->handle); return mutex; } void mutex_lock(mutex_t mutex) { - EnterCriticalSection(mutex); + EnterCriticalSection(&mutex->handle); } void mutex_unlock(mutex_t mutex) { - LeaveCriticalSection(mutex); + LeaveCriticalSection(&mutex->handle); } void mutex_destroy(mutex_t mutex) { - DeleteCriticalSection(mutex); + DeleteCriticalSection(&mutex->handle); } #endif #ifdef __linux__