diff --git a/CMakeLists.txt b/CMakeLists.txt index 77eec9e..7adbda0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,10 +8,10 @@ if(WIN32) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) endif() -add_executable(esc src/main.c src/mutex.c src/mutex.h src/thread.c src/thread.h src/socket.c src/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/uuid.c src/uuid.h src/buffer.c src/buffer.h src/tcp_package.c src/tcp_package.h) +add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h) if(WIN32) target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a) else() - target_link_libraries(esc protobuf-c) + target_link_libraries(esc pthread protobuf-c) endif() \ No newline at end of file diff --git a/src/esc.c b/src/esc.c index ae8c237..f91fbbf 100644 --- a/src/esc.c +++ b/src/esc.c @@ -5,16 +5,38 @@ #include #include #include +#include +#include #include "esc.h" -#include "socket.h" -#include "buffer.h" +#include "utils/socket.h" +#include "utils/buffer.h" #include "proto.h" #include "tcp_package.h" +#include "utils/thread.h" +#include "utils/mutex.h" + +const char* string_copy(const char *src) { + char* dst = malloc(strlen(src)+1); + strcpy(dst, src); + return dst; +} typedef int bool_t; const bool_t BOOL_TRUE = 1; const bool_t BOOL_FALSE = 0; +#define ERROR_BUFFER_SIZE 4096 +esc_error_t error_create(int code, char* format, ...) { + va_list vl; + va_start(vl, format); + char buf[ERROR_BUFFER_SIZE]; + size_t size = (size_t)snprintf(buf, ERROR_BUFFER_SIZE, format, vl); + char* msg = malloc(size+1); + strcpy(msg, buf); + esc_error_t res = {code, msg}; + return res; +} + struct st_connection_settings { bool_t use_ssl_connection; }; @@ -40,10 +62,13 @@ typedef const node_endpoints_t* (*endpoint_discoverer_t)(const void* discover_da struct st_connection { esc_connection_settings_t settings; + const char* name; void* discoverer_data; endpoint_discoverer_t discover; socket_t tcp_conn; ProtobufCAllocator protobuf_c_allocator; + thread_t manager_thread; + mutex_t mutex_lock; }; struct st_static_endpoint_discoverer { @@ -134,9 +159,32 @@ const esc_connection_t* esc_connection_create(const esc_connection_settings_t* c conn->protobuf_c_allocator.alloc = protobuf_c_alloc; conn->protobuf_c_allocator.free = protobuf_c_free; conn->protobuf_c_allocator.allocator_data = 0; + if (connection_name == 0 || strcmp(connection_name, "") == 0) { + const esc_uuid_t* uuid = esc_uuid_create(); + char buf[40]; + esc_uuid_format(uuid, buf, 40); + conn->name = string_copy(buf); + free(uuid); + } else { + conn->name = connection_name; + } return conn; } +void* connection_thread(void* arg) { + const esc_connection_t* conn = arg; + + while(1) { + //mutex_lock(conn->mutex_lock); + //if (socket_readable(conn->tcp_conn)) { + + //} + + //mutex_unlock(conn->mutex_lock); + sleep(1); + } +} + // return 0 on success // return non-zero on failure and sets last_error on connection int esc_connection_connect(const esc_connection_t* conn) { @@ -156,7 +204,7 @@ int esc_connection_connect(const esc_connection_t* conn) { // build message EventStore__Client__Messages__IdentifyClient identify_client; event_store__client__messages__identify_client__init(&identify_client); - identify_client.connection_name = "abc123"; + identify_client.connection_name = (char*)conn->name; identify_client.version = 1; size_t s = event_store__client__messages__identify_client__get_packed_size(&identify_client); uint8_t buffer[s]; @@ -174,6 +222,12 @@ int esc_connection_connect(const esc_connection_t* conn) { return -5; } + _conn->mutex_lock = mutex_create(); + _conn->manager_thread = thread_create(connection_thread, _conn); + if (thread_start(_conn->manager_thread)) { + return -6; + } + return 0; } @@ -184,12 +238,6 @@ const esc_credentials_t* esc_credentials_create(const char* username, const char return creds; } -const char* string_copy(const char *src) { - char* dst = malloc(strlen(src)+1); - strcpy(dst, src); - return dst; -} - const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) { if (msg == 0) return 0; esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t)); @@ -259,6 +307,10 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti return result; } +void esc_connection_close(const esc_connection_t* conn) { + socket_close(conn->tcp_conn); +} + const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position); return buffer; diff --git a/src/esc.h b/src/esc.h index e8174d7..f1383d7 100644 --- a/src/esc.h +++ b/src/esc.h @@ -5,8 +5,14 @@ #ifndef ESC_ESC_H #define ESC_ESC_H -#include "uuid.h" -#include "buffer.h" +#include "utils/uuid.h" +#include "utils/buffer.h" + +struct st_error { + const int code; + const char* message; +}; +typedef struct st_error esc_error_t; struct st_connection_settings; typedef struct st_connection_settings esc_connection_settings_t; @@ -56,10 +62,14 @@ typedef struct st_all_events_slice esc_all_events_slice_t; const esc_connection_settings_t* esc_default_connection_settings; +// Connection const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name); int esc_connection_connect(const esc_connection_t* conn); const esc_credentials_t* esc_credentials_create(const char* username, const char* password); const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials); +void esc_connection_close(const esc_connection_t* conn); + +// Utils const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size); #endif //ESC_ESC_H diff --git a/src/main.c b/src/main.c index 6d15780..664bfb7 100644 --- a/src/main.c +++ b/src/main.c @@ -1,6 +1,6 @@ #include -#include "mutex.h" +#include "utils/mutex.h" #include "esc.h" int main() { @@ -20,7 +20,7 @@ int main() { const esc_all_events_slice_t *result = 0; do { - result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 1024, credentials); + result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 100, credentials); if (result == 0) { return -3; } @@ -41,6 +41,8 @@ int main() { } } while(result->is_end_of_stream == 0); + esc_connection_close(conn); + #ifdef _WIN32 WSACleanup(); #endif diff --git a/src/tcp_package.h b/src/tcp_package.h index e0dbcc7..1f8b977 100644 --- a/src/tcp_package.h +++ b/src/tcp_package.h @@ -5,8 +5,8 @@ #ifndef ESC_TCP_PACKAGE_H #define ESC_TCP_PACKAGE_H -#include "buffer.h" -#include "uuid.h" +#include "utils/buffer.h" +#include "utils/uuid.h" struct st_tcp_package { uint8_t command; diff --git a/src/thread.c b/src/thread.c deleted file mode 100644 index aed1bf3..0000000 --- a/src/thread.c +++ /dev/null @@ -1,32 +0,0 @@ -// -// Created by nicol on 2018-03-18. -// - -#include "thread.h" - -#ifdef _WIN32 -thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { - thread_t thread = malloc(sizeof(struct st_thread)); - thread->handle = CreateThread(NULL, 0, thread_start, thread_arg, CREATE_SUSPENDED, &thread->id); - if (thread->handle == NULL) { - free(thread); - return NULL; - } - return thread; -} - -int thread_start(thread_t thread) { - if (ResumeThread(thread->handle) == -1) return -1; - return 0; -} - -int thread_wait(thread_t thread) { - if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) return -1; - return 0; -} - -int thread_destroy(thread_t thread) { - if (thread_wait(thread) != 0) SuspendThread(thread->handle); - CloseHandle(thread->handle); -} -#endif \ No newline at end of file diff --git a/src/thread.h b/src/thread.h deleted file mode 100644 index d34c77d..0000000 --- a/src/thread.h +++ /dev/null @@ -1,19 +0,0 @@ -// -// Created by nicol on 2018-03-18. -// - -#ifndef ESC_THREAD_H -#define ESC_THREAD_H - -#ifdef _WIN32 -#include -struct st_thread { - DWORD id; - HANDLE handle; -}; -typedef struct st_thread* thread_t; -typedef LPTHREAD_START_ROUTINE thread_start_t; -typedef LPVOID thread_arg_t; -#endif - -#endif //ESC_THREAD_H diff --git a/src/buffer.c b/src/utils/buffer.c similarity index 100% rename from src/buffer.c rename to src/utils/buffer.c diff --git a/src/buffer.h b/src/utils/buffer.h similarity index 95% rename from src/buffer.h rename to src/utils/buffer.h index 02aef7f..3f0b044 100644 --- a/src/buffer.h +++ b/src/utils/buffer.h @@ -6,6 +6,7 @@ #define ESC_BUFFER_H #include +#include struct st_buffer { size_t size; diff --git a/src/mutex.c b/src/utils/mutex.c similarity index 96% rename from src/mutex.c rename to src/utils/mutex.c index 372ab0d..20409e5 100644 --- a/src/mutex.c +++ b/src/utils/mutex.c @@ -27,7 +27,8 @@ void mutex_unlock(mutex_t mutex) { void mutex_destroy(mutex_t mutex) { DeleteCriticalSection(mutex); } -#else +#endif +#ifdef __linux__ mutex_t mutex_create() { mutex_t mutex = malloc(sizeof(mutex_t)); pthread_mutex_init(mutex, 0); diff --git a/src/mutex.h b/src/utils/mutex.h similarity index 93% rename from src/mutex.h rename to src/utils/mutex.h index e8711e2..e177658 100644 --- a/src/mutex.h +++ b/src/utils/mutex.h @@ -8,7 +8,8 @@ #ifdef _WIN32 #include typedef LPCRITICAL_SECTION mutex_t; -#else +#endif +#ifdef __linux__ #include typedef pthread_mutex_t* mutex_t; #endif diff --git a/src/socket.c b/src/utils/socket.c similarity index 98% rename from src/socket.c rename to src/utils/socket.c index d459075..a72f213 100644 --- a/src/socket.c +++ b/src/utils/socket.c @@ -66,7 +66,8 @@ int socket_recv(socket_t s, char* buf, int len) { int socket_error() { return WSAGetLastError(); } -#else +#endif +#ifdef __linux__ struct st_socket { int s; int af; diff --git a/src/socket.h b/src/utils/socket.h similarity index 100% rename from src/socket.h rename to src/utils/socket.h diff --git a/src/utils/thread.c b/src/utils/thread.c new file mode 100644 index 0000000..8f799f5 --- /dev/null +++ b/src/utils/thread.c @@ -0,0 +1,75 @@ +// +// Created by nicol on 2018-03-18. +// + +#include +#include "thread.h" + +#ifdef _WIN32 +thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { + thread_t thread = malloc(sizeof(struct st_thread)); + thread->handle = CreateThread(NULL, 0, thread_start, thread_arg, CREATE_SUSPENDED, &thread->id); + if (thread->handle == NULL) { + free(thread); + return NULL; + } + return thread; +} + +int thread_start(thread_t thread) { + if (ResumeThread(thread->handle) == -1) return -1; + return 0; +} + +int thread_wait(thread_t thread) { + if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) return -1; + return 0; +} + +int thread_destroy(thread_t thread) { + if (thread_wait(thread) != 0) SuspendThread(thread->handle); + CloseHandle(thread->handle); +} +#endif +#ifdef __linux__ +#include + +struct st_thread { + pthread_t handle; + thread_start_t start; + thread_arg_t arg; + int last_error; +}; + +thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { + thread_t thread = malloc(sizeof(struct st_thread)); + thread->handle = 0; + thread->start = thread_start; + thread->arg = thread_arg; + thread->last_error = 0; + return thread; +} + +int thread_start(thread_t thread) { + int rc = pthread_create(&thread->handle, NULL, thread->start, thread->arg); + if (rc != 0) thread->last_error = rc; + return rc; +} + +int thread_wait(thread_t thread) { + void* result; + int rc = pthread_join(thread->handle, &result); + if (rc != 0) thread->last_error = rc; + return rc; +} + +int thread_destroy(thread_t thread) { + if (thread_wait(thread)) return pthread_cancel(thread->handle); + free(thread); + return 0; +} + +int thread_error(thread_t thread) { + return thread->last_error; +} +#endif \ No newline at end of file diff --git a/src/utils/thread.h b/src/utils/thread.h new file mode 100644 index 0000000..bb92c0a --- /dev/null +++ b/src/utils/thread.h @@ -0,0 +1,29 @@ +// +// Created by nicol on 2018-03-18. +// + +#ifndef ESC_THREAD_H +#define ESC_THREAD_H + +#ifdef _WIN32 +#include +struct st_thread { + DWORD id; + HANDLE handle; +}; +typedef struct st_thread* thread_t; +typedef LPTHREAD_START_ROUTINE thread_start_t; +typedef LPVOID thread_arg_t; +#endif +typedef void* (*thread_start_t)(void*); +typedef void* thread_arg_t; +typedef struct st_thread* thread_t; + +thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg); +int thread_start(thread_t thread); +int thread_wait(thread_t thread); +int thread_destroy(thread_t thread); +int thread_error(thread_t thread); +void* thread_result(thread_t thread); + +#endif //ESC_THREAD_H diff --git a/src/uuid.c b/src/utils/uuid.c similarity index 86% rename from src/uuid.c rename to src/utils/uuid.c index 198000c..4f33c35 100644 --- a/src/uuid.c +++ b/src/utils/uuid.c @@ -7,6 +7,7 @@ #include #include "uuid.h" +//TODO: fixme - this is not secure, use crypto random bytes instead of rand const esc_uuid_t* esc_uuid_create() { esc_uuid_t* result = malloc(sizeof(esc_uuid_t)); srand(time(NULL)); @@ -26,7 +27,7 @@ const esc_uuid_t* esc_uuid_from(uint8_t* src, size_t size) { } const char* esc_uuid_format(const esc_uuid_t* uuid, char* buffer, size_t buf_size) { - snprintf(buffer, buf_size, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", + snprintf(buffer, buf_size, "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x", uuid->data[0], uuid->data[1], uuid->data[2], diff --git a/src/uuid.h b/src/utils/uuid.h similarity index 100% rename from src/uuid.h rename to src/utils/uuid.h