diff --git a/CMakeLists.txt b/CMakeLists.txt index 586814c..566fa0f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,12 +3,14 @@ project(esc C) set(CMAKE_C_STANDARD 99) +add_compile_options(-DDEBUG) + if(WIN32) include_directories(c:/Users/nicol/dev/thirdparty/protobuf-c) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) endif() -add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h) +add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c) if(WIN32) target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a) diff --git a/src/esc.c b/src/esc.c index 2309d81..12f1516 100644 --- a/src/esc.c +++ b/src/esc.c @@ -7,6 +7,7 @@ #include #include #include "esc.h" +#include "utils/debug.h" #include "utils/socket.h" #include "proto.h" #include "tcp_package.h" @@ -238,32 +239,34 @@ void* connection_thread(void* arg) { while(conn->stop == BOOL_FALSE) { if (socket_writable(conn->tcp_conn)) { - printf("T1"); tcp_package_t send_pkg = queue_dequeue(conn->send_queue); - printf("T1(%p)", send_pkg); - if (send_pkg && connection_send_tcp_package(conn, send_pkg)) { - fprintf(stderr, "failed to send pkg."); + if (send_pkg != 0) { + ssize_t rc = connection_send_tcp_package(conn, send_pkg); + if (rc == 0) { + tcp_package_destroy(send_pkg); + } else { + fprintf(stderr, "failed to send pkg."); + } } - printf("T2"); } if (socket_readable(conn->tcp_conn)) { - printf("T3"); tcp_package_t recv_pkg = connection_recv_tcp_package(conn); - printf("T4"); - if (recv_pkg && recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { + if (recv_pkg == 0) { + //TODO Handle connection lost + conn->stop = 1; + } else if (recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); - printf("T5"); queue_enqueue(conn->send_queue, heartbeat_pkg); - } else if (recv_pkg) { - printf("T6"); + } else { queue_enqueue(conn->recv_queue, recv_pkg); } - printf("T7"); } usleep(1); } + + printf("manager thread stopping\n"); } // return 0 on success @@ -297,15 +300,18 @@ int esc_connection_connect(esc_connection_t conn) { uint8_t buffer[s]; event_store__client__messages__identify_client__pack(&identify_client, buffer); // build tcp_package - tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, esc_uuid_create(), buffer_from(buffer, s)); + esc_uuid_t correlation_id = esc_uuid_create(); + tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, correlation_id, buffer_from(buffer, s)); queue_enqueue(conn->send_queue, send_pkg); - tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); + tcp_package_t recv_pkg = connection_wait_for(conn, correlation_id); if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) { conn->last_error = error_create(0, "server error: %d.", recv_pkg->command); + tcp_package_destroy(recv_pkg); return -1; } - + + tcp_package_destroy(recv_pkg); return 0; } @@ -351,22 +357,16 @@ esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, co size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); uint8_t buffer[s]; event_store__client__messages__read_all_events__pack(&send_msg, buffer); - printf("A %s\n", credentials->password); - tcp_package_t send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); - printf("A1 %s %s\n", credentials->password, send_pkg->password); + esc_uuid_t correlation_id = esc_uuid_create(); + tcp_package_t send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, correlation_id, buffer_from(buffer, s), credentials->username, credentials->password); queue_enqueue(conn->send_queue, send_pkg); - printf("A2 %s %s\n", credentials->password, send_pkg->password); - printf("B %s\n", credentials->password); - - tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); + tcp_package_t recv_pkg = connection_wait_for(conn, correlation_id); if (recv_pkg->command != MESSAGE_READALLEVENTSFORWARDCOMPLETED) { conn->last_error = error_create(recv_pkg->command, "server error: %s", get_string_for_tcp_message(recv_pkg->command)); return 0; } - printf("C %s\n", credentials->password); - //TODO function for unpacking from protobuf data to client struct size_t data_size = buffer_size(recv_pkg->data); uint8_t* data = buffer_data(recv_pkg->data); @@ -388,12 +388,11 @@ esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, co } event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); - printf("D %s\n", credentials->password); - return result; } void esc_connection_close(esc_connection_t conn) { + conn->stop = 1; socket_close(conn->tcp_conn); } diff --git a/src/main.c b/src/main.c index c38cdd8..5ada39d 100644 --- a/src/main.c +++ b/src/main.c @@ -1,10 +1,12 @@ #include +#include "utils/debug.h" #include "esc.h" #ifdef _WIN32 #include #include #define usleep Sleep +#define sleep(x) Sleep(x*1000) #endif #ifdef __linux__ #include @@ -53,10 +55,12 @@ int main() { } } while(result->is_end_of_stream == 0); - usleep(30000); + sleep(10); esc_connection_close(conn); + dbg_list_allocs(); + #ifdef _WIN32 WSACleanup(); #endif diff --git a/src/tcp_package.c b/src/tcp_package.c index d7f8eb7..08125e4 100644 --- a/src/tcp_package.c +++ b/src/tcp_package.c @@ -7,6 +7,7 @@ #include #include #include "tcp_package.h" +#include "utils/debug.h" #include "utils/string.h" const uint32_t CommandOffset = 0; diff --git a/src/utils/buffer.c b/src/utils/buffer.c index 388588c..effd155 100644 --- a/src/utils/buffer.c +++ b/src/utils/buffer.c @@ -4,6 +4,7 @@ #include #include +#include "debug.h" #include "buffer.h" struct st_buffer { diff --git a/src/utils/debug.c b/src/utils/debug.c new file mode 100644 index 0000000..d6b7348 --- /dev/null +++ b/src/utils/debug.c @@ -0,0 +1,52 @@ +// +// Created by nicolas on 21/03/18. +// + +#include +#include +void* (*_malloc)(size_t) = malloc; +void (*_free)(void*) = free; +#include "debug.h" + +struct st_alloc { + const char* file; + int line; + size_t size; + void *ptr; +}; +size_t _allocations_count = 0; +size_t _allocations_size = 0; +struct st_alloc* _allocations = 0; + +void* dbg_malloc(const char* file, int line, size_t size) { + if (_allocations_count == _allocations_size) { + _allocations = _allocations_size == 0 ? _malloc(1024*sizeof(struct st_alloc)) : realloc(_allocations, (_allocations_size + 1024)*sizeof(struct st_alloc)); + _allocations_size += 1024; + } + void* ptr = _malloc(size); + struct st_alloc* alloc = &_allocations[_allocations_count++]; + alloc->file = file; + alloc->line = line; + alloc->size = size; + alloc->ptr = ptr; + return ptr; +} + +void dbg_free(void* ptr) { + for (size_t i = 0; i < _allocations_count; i++) { + if (_allocations[i].ptr == ptr) _allocations[i].ptr = 0; + } + _free(ptr); +} + + +void dbg_list_allocs() { + size_t total = 0; + printf("Memory leaks:\n"); + for (size_t i = 0; i < _allocations_count; i++) { + if (_allocations[i].ptr == 0) continue; + printf("%lu %p (%lu) @ %s:%d\n", i, _allocations[i].ptr, _allocations[i].size, _allocations[i].file, _allocations[i].line); + total += _allocations[i].size; + } + printf("Total memory leaked: %lu\n", total); +} \ No newline at end of file diff --git a/src/utils/debug.h b/src/utils/debug.h new file mode 100644 index 0000000..a359388 --- /dev/null +++ b/src/utils/debug.h @@ -0,0 +1,19 @@ +// +// Created by nicolas on 21/03/18. +// + +#ifndef ESC_DEBUG_H +#define ESC_DEBUG_H + +#include +#include + +#ifdef DEBUG +void* dbg_malloc(const char* file, int line, size_t size); +void dbg_free(void* ptr); +void dbg_list_allocs(); +#define malloc(s) dbg_malloc(__FILE__, __LINE__, s) +#define free(p) dbg_free(p) +#endif + +#endif //ESC_DEBUG_H diff --git a/src/utils/mutex.c b/src/utils/mutex.c index d2f9879..fcd7146 100644 --- a/src/utils/mutex.c +++ b/src/utils/mutex.c @@ -7,6 +7,7 @@ #endif #include +#include "debug.h" #include "mutex.h" #ifdef _WIN32 diff --git a/src/utils/queue.c b/src/utils/queue.c index 2943ed7..b4fa43e 100644 --- a/src/utils/queue.c +++ b/src/utils/queue.c @@ -5,6 +5,7 @@ #include #include #include +#include "debug.h" #include "queue.h" #include "mutex.h" @@ -56,6 +57,9 @@ void* queue_dequeue(queue_t q) { struct st_node* node = q->start; void* item = node->item; q->start = node->next; + if (q->end == node) { + q->end = 0; + } q->size--; mutex_unlock(q->lock); free(node); diff --git a/src/utils/socket.c b/src/utils/socket.c index ecac208..fa708f0 100644 --- a/src/utils/socket.c +++ b/src/utils/socket.c @@ -4,6 +4,7 @@ #include #include +#include "debug.h" #include "socket.h" #ifdef _WIN32 diff --git a/src/utils/string.c b/src/utils/string.c index 7652a50..fe0aeb6 100644 --- a/src/utils/string.c +++ b/src/utils/string.c @@ -4,6 +4,7 @@ #include #include +#include "debug.h" #include "string.h" const char* string_copy(const char *src) { diff --git a/src/utils/thread.c b/src/utils/thread.c index 61d59dc..642d70b 100644 --- a/src/utils/thread.c +++ b/src/utils/thread.c @@ -3,6 +3,7 @@ // #include +#include "debug.h" #include "thread.h" #ifdef _WIN32 diff --git a/src/utils/uuid.c b/src/utils/uuid.c index 8d5842f..0dd0e19 100644 --- a/src/utils/uuid.c +++ b/src/utils/uuid.c @@ -6,6 +6,7 @@ #include #include #include +#include "debug.h" #include "uuid.h" #define UUID_SIZE 16