Fixed dequeue bug

Added memory leak tracking in debug
This commit is contained in:
Nicolas Dextraze 2018-03-21 23:01:34 -07:00
parent ec79155abf
commit 4eef29c0da
13 changed files with 115 additions and 28 deletions

View File

@ -3,12 +3,14 @@ project(esc C)
set(CMAKE_C_STANDARD 99) set(CMAKE_C_STANDARD 99)
add_compile_options(-DDEBUG)
if(WIN32) if(WIN32)
include_directories(c:/Users/nicol/dev/thirdparty/protobuf-c) include_directories(c:/Users/nicol/dev/thirdparty/protobuf-c)
link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs)
endif() endif()
add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h) add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c)
if(WIN32) if(WIN32)
target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a) target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a)

View File

@ -7,6 +7,7 @@
#include <string.h> #include <string.h>
#include <stdarg.h> #include <stdarg.h>
#include "esc.h" #include "esc.h"
#include "utils/debug.h"
#include "utils/socket.h" #include "utils/socket.h"
#include "proto.h" #include "proto.h"
#include "tcp_package.h" #include "tcp_package.h"
@ -238,32 +239,34 @@ void* connection_thread(void* arg) {
while(conn->stop == BOOL_FALSE) { while(conn->stop == BOOL_FALSE) {
if (socket_writable(conn->tcp_conn)) { if (socket_writable(conn->tcp_conn)) {
printf("T1");
tcp_package_t send_pkg = queue_dequeue(conn->send_queue); tcp_package_t send_pkg = queue_dequeue(conn->send_queue);
printf("T1(%p)", send_pkg); if (send_pkg != 0) {
if (send_pkg && connection_send_tcp_package(conn, send_pkg)) { ssize_t rc = connection_send_tcp_package(conn, send_pkg);
if (rc == 0) {
tcp_package_destroy(send_pkg);
} else {
fprintf(stderr, "failed to send pkg."); fprintf(stderr, "failed to send pkg.");
} }
printf("T2"); }
} }
if (socket_readable(conn->tcp_conn)) { if (socket_readable(conn->tcp_conn)) {
printf("T3");
tcp_package_t recv_pkg = connection_recv_tcp_package(conn); tcp_package_t recv_pkg = connection_recv_tcp_package(conn);
printf("T4"); if (recv_pkg == 0) {
if (recv_pkg && recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { //TODO Handle connection lost
conn->stop = 1;
} else if (recv_pkg->command == MESSAGE_HEARTBEATREQUEST) {
tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0));
printf("T5");
queue_enqueue(conn->send_queue, heartbeat_pkg); queue_enqueue(conn->send_queue, heartbeat_pkg);
} else if (recv_pkg) { } else {
printf("T6");
queue_enqueue(conn->recv_queue, recv_pkg); queue_enqueue(conn->recv_queue, recv_pkg);
} }
printf("T7");
} }
usleep(1); usleep(1);
} }
printf("manager thread stopping\n");
} }
// return 0 on success // return 0 on success
@ -297,15 +300,18 @@ int esc_connection_connect(esc_connection_t conn) {
uint8_t buffer[s]; uint8_t buffer[s];
event_store__client__messages__identify_client__pack(&identify_client, buffer); event_store__client__messages__identify_client__pack(&identify_client, buffer);
// build tcp_package // build tcp_package
tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, esc_uuid_create(), buffer_from(buffer, s)); esc_uuid_t correlation_id = esc_uuid_create();
tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, correlation_id, buffer_from(buffer, s));
queue_enqueue(conn->send_queue, send_pkg); queue_enqueue(conn->send_queue, send_pkg);
tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); tcp_package_t recv_pkg = connection_wait_for(conn, correlation_id);
if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) { if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) {
conn->last_error = error_create(0, "server error: %d.", recv_pkg->command); conn->last_error = error_create(0, "server error: %d.", recv_pkg->command);
tcp_package_destroy(recv_pkg);
return -1; return -1;
} }
tcp_package_destroy(recv_pkg);
return 0; return 0;
} }
@ -351,22 +357,16 @@ esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, co
size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg);
uint8_t buffer[s]; uint8_t buffer[s];
event_store__client__messages__read_all_events__pack(&send_msg, buffer); event_store__client__messages__read_all_events__pack(&send_msg, buffer);
printf("A %s\n", credentials->password); esc_uuid_t correlation_id = esc_uuid_create();
tcp_package_t send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); tcp_package_t send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, correlation_id, buffer_from(buffer, s), credentials->username, credentials->password);
printf("A1 %s %s\n", credentials->password, send_pkg->password);
queue_enqueue(conn->send_queue, send_pkg); queue_enqueue(conn->send_queue, send_pkg);
printf("A2 %s %s\n", credentials->password, send_pkg->password);
printf("B %s\n", credentials->password); tcp_package_t recv_pkg = connection_wait_for(conn, correlation_id);
tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id);
if (recv_pkg->command != MESSAGE_READALLEVENTSFORWARDCOMPLETED) { if (recv_pkg->command != MESSAGE_READALLEVENTSFORWARDCOMPLETED) {
conn->last_error = error_create(recv_pkg->command, "server error: %s", get_string_for_tcp_message(recv_pkg->command)); conn->last_error = error_create(recv_pkg->command, "server error: %s", get_string_for_tcp_message(recv_pkg->command));
return 0; return 0;
} }
printf("C %s\n", credentials->password);
//TODO function for unpacking from protobuf data to client struct //TODO function for unpacking from protobuf data to client struct
size_t data_size = buffer_size(recv_pkg->data); size_t data_size = buffer_size(recv_pkg->data);
uint8_t* data = buffer_data(recv_pkg->data); uint8_t* data = buffer_data(recv_pkg->data);
@ -388,12 +388,11 @@ esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, co
} }
event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator);
printf("D %s\n", credentials->password);
return result; return result;
} }
void esc_connection_close(esc_connection_t conn) { void esc_connection_close(esc_connection_t conn) {
conn->stop = 1;
socket_close(conn->tcp_conn); socket_close(conn->tcp_conn);
} }

View File

@ -1,10 +1,12 @@
#include <stdio.h> #include <stdio.h>
#include "utils/debug.h"
#include "esc.h" #include "esc.h"
#ifdef _WIN32 #ifdef _WIN32
#include <winsock.h> #include <winsock.h>
#include <windows.h> #include <windows.h>
#define usleep Sleep #define usleep Sleep
#define sleep(x) Sleep(x*1000)
#endif #endif
#ifdef __linux__ #ifdef __linux__
#include <unistd.h> #include <unistd.h>
@ -53,10 +55,12 @@ int main() {
} }
} while(result->is_end_of_stream == 0); } while(result->is_end_of_stream == 0);
usleep(30000); sleep(10);
esc_connection_close(conn); esc_connection_close(conn);
dbg_list_allocs();
#ifdef _WIN32 #ifdef _WIN32
WSACleanup(); WSACleanup();
#endif #endif

View File

@ -7,6 +7,7 @@
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
#include "tcp_package.h" #include "tcp_package.h"
#include "utils/debug.h"
#include "utils/string.h" #include "utils/string.h"
const uint32_t CommandOffset = 0; const uint32_t CommandOffset = 0;

View File

@ -4,6 +4,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "debug.h"
#include "buffer.h" #include "buffer.h"
struct st_buffer { struct st_buffer {

52
src/utils/debug.c Normal file
View File

@ -0,0 +1,52 @@
//
// Created by nicolas on 21/03/18.
//
#include <stdlib.h>
#include <stdio.h>
void* (*_malloc)(size_t) = malloc;
void (*_free)(void*) = free;
#include "debug.h"
struct st_alloc {
const char* file;
int line;
size_t size;
void *ptr;
};
size_t _allocations_count = 0;
size_t _allocations_size = 0;
struct st_alloc* _allocations = 0;
void* dbg_malloc(const char* file, int line, size_t size) {
if (_allocations_count == _allocations_size) {
_allocations = _allocations_size == 0 ? _malloc(1024*sizeof(struct st_alloc)) : realloc(_allocations, (_allocations_size + 1024)*sizeof(struct st_alloc));
_allocations_size += 1024;
}
void* ptr = _malloc(size);
struct st_alloc* alloc = &_allocations[_allocations_count++];
alloc->file = file;
alloc->line = line;
alloc->size = size;
alloc->ptr = ptr;
return ptr;
}
void dbg_free(void* ptr) {
for (size_t i = 0; i < _allocations_count; i++) {
if (_allocations[i].ptr == ptr) _allocations[i].ptr = 0;
}
_free(ptr);
}
void dbg_list_allocs() {
size_t total = 0;
printf("Memory leaks:\n");
for (size_t i = 0; i < _allocations_count; i++) {
if (_allocations[i].ptr == 0) continue;
printf("%lu %p (%lu) @ %s:%d\n", i, _allocations[i].ptr, _allocations[i].size, _allocations[i].file, _allocations[i].line);
total += _allocations[i].size;
}
printf("Total memory leaked: %lu\n", total);
}

19
src/utils/debug.h Normal file
View File

@ -0,0 +1,19 @@
//
// Created by nicolas on 21/03/18.
//
#ifndef ESC_DEBUG_H
#define ESC_DEBUG_H
#include <stddef.h>
#include <malloc.h>
#ifdef DEBUG
void* dbg_malloc(const char* file, int line, size_t size);
void dbg_free(void* ptr);
void dbg_list_allocs();
#define malloc(s) dbg_malloc(__FILE__, __LINE__, s)
#define free(p) dbg_free(p)
#endif
#endif //ESC_DEBUG_H

View File

@ -7,6 +7,7 @@
#endif #endif
#include <stdlib.h> #include <stdlib.h>
#include "debug.h"
#include "mutex.h" #include "mutex.h"
#ifdef _WIN32 #ifdef _WIN32

View File

@ -5,6 +5,7 @@
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include "debug.h"
#include "queue.h" #include "queue.h"
#include "mutex.h" #include "mutex.h"
@ -56,6 +57,9 @@ void* queue_dequeue(queue_t q) {
struct st_node* node = q->start; struct st_node* node = q->start;
void* item = node->item; void* item = node->item;
q->start = node->next; q->start = node->next;
if (q->end == node) {
q->end = 0;
}
q->size--; q->size--;
mutex_unlock(q->lock); mutex_unlock(q->lock);
free(node); free(node);

View File

@ -4,6 +4,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include "debug.h"
#include "socket.h" #include "socket.h"
#ifdef _WIN32 #ifdef _WIN32

View File

@ -4,6 +4,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "debug.h"
#include "string.h" #include "string.h"
const char* string_copy(const char *src) { const char* string_copy(const char *src) {

View File

@ -3,6 +3,7 @@
// //
#include <stdlib.h> #include <stdlib.h>
#include "debug.h"
#include "thread.h" #include "thread.h"
#ifdef _WIN32 #ifdef _WIN32

View File

@ -6,6 +6,7 @@
#include <time.h> #include <time.h>
#include <stdio.h> #include <stdio.h>
#include <memory.h> #include <memory.h>
#include "debug.h"
#include "uuid.h" #include "uuid.h"
#define UUID_SIZE 16 #define UUID_SIZE 16