From 2088fd40826ae67c9242db439b8792c6dd928ff5 Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Tue, 20 Mar 2018 17:21:34 -0700 Subject: [PATCH] adding tcp_messages, concurrent queue, types refactoring --- CMakeLists.txt | 2 +- src/esc.c | 201 ++++++++++++++++++++++----------------------- src/esc.h | 12 +-- src/main.c | 21 +++-- src/tcp_messages.c | 72 ++++++++++++++++ src/tcp_messages.h | 89 ++++++++++++++++++++ src/tcp_package.c | 85 +++++++++++-------- src/tcp_package.h | 12 +-- src/utils/buffer.c | 44 ++++++++-- src/utils/buffer.h | 16 ++-- src/utils/mutex.c | 16 ++-- src/utils/mutex.h | 9 +- src/utils/queue.c | 89 ++++++++++++++++++++ src/utils/queue.h | 17 ++++ src/utils/socket.c | 71 +++++++++++++--- src/utils/socket.h | 6 +- src/utils/thread.c | 42 ++++++++-- src/utils/thread.h | 2 +- src/utils/uuid.c | 31 +++---- src/utils/uuid.h | 17 ++-- 20 files changed, 621 insertions(+), 233 deletions(-) create mode 100644 src/tcp_messages.c create mode 100644 src/tcp_messages.h create mode 100644 src/utils/queue.c create mode 100644 src/utils/queue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index ab97920..586814c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ if(WIN32) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) endif() -add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h) +add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h) if(WIN32) target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a) diff --git a/src/esc.c b/src/esc.c index cb54fb7..3253236 100644 --- a/src/esc.c +++ b/src/esc.c @@ -6,7 +6,6 @@ #include #include #include -#include #include "esc.h" #include "utils/socket.h" #include "utils/buffer.h" @@ -15,9 +14,14 @@ #include "utils/thread.h" #include "utils/mutex.h" #include "utils/string.h" +#include "utils/queue.h" +#include "tcp_messages.h" #ifdef _WIN32 -#define sleep Sleep +#define usleep Sleep +#endif +#ifdef __linux__ +#include #endif typedef int bool_t; @@ -67,11 +71,10 @@ struct st_connection { socket_t tcp_conn; ProtobufCAllocator protobuf_c_allocator; thread_t manager_thread; - mutex_t sending_lock; - mutex_t send_lock; - tcp_package_t* send_pkg; - mutex_t recv_lock; - tcp_package_t* recv_pkg; + bool_t stop; + mutex_t recv_peek_lock; + queue_t send_queue; + queue_t recv_queue; }; struct st_static_endpoint_discoverer { @@ -81,7 +84,7 @@ struct st_static_endpoint_discoverer { typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t; const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* discover_data, const tcp_endpoint_t* failed_tcp_endpoint) { - node_endpoints_t* result = malloc(sizeof(node_endpoints_t)); + node_endpoints_t* result = malloc(sizeof(struct st_node_endpoints)); if (discover_data->use_ssl_connection) { result->secure_tcp_endpoint = discover_data->tcp_endpoint; } else { @@ -90,64 +93,71 @@ const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* disc return result; } -int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) { +//TODO partial transfer +ssize_t connection_send_tcp_package(esc_connection_t conn, tcp_package_t pkg) { char uuid_buf[37]; - fprintf(stderr, "connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size); + fprintf(stderr, "connection_send_tcp_package: %u %u %s %lu ", pkg->command, pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); buffer_t send_buffer = tcp_package_to_buffer(pkg); - uint32_t size = (uint32_t)send_buffer.size; - if (socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t)) <= 0) { - buffer_free(send_buffer); + + size_t send_buffer_size = buffer_size(send_buffer); + uint32_t size = (uint32_t)send_buffer_size; + ssize_t rc; + if ((rc = socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t))) <= 0) { + buffer_destroy(send_buffer); + fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); return -1; } - if (socket_send(conn->tcp_conn, (char *) send_buffer.data, send_buffer.size) <= 0) { - buffer_free(send_buffer); + + uint8_t* send_buffer_data = buffer_data(send_buffer); + if ((rc = socket_send(conn->tcp_conn, (char *) send_buffer_data, send_buffer_size)) <= 0) { + buffer_destroy(send_buffer); + fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); return -2; } - buffer_free(send_buffer); + + buffer_destroy(send_buffer); + fprintf(stderr, "0\n"); return 0; } -const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) { +//TODO partial transfer +tcp_package_t connection_recv_tcp_package(esc_connection_t conn) { uint32_t recv_size; ssize_t rc; if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { - fprintf(stderr, "%ld %d", rc, socket_error()); + fprintf(stderr, "connection_recv_tcp_package: %ld %d\n", rc, socket_error(conn->tcp_conn)); return 0; } buffer_t recv_buffer = buffer_create(recv_size); + size_t recv_buffer_size = recv_size; + uint8_t* recv_buffer_data = buffer_data(recv_buffer); while(recv_size > 0) { - size_t pos = recv_buffer.size - recv_size; - rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer.data[pos], recv_size); + size_t pos = recv_buffer_size - recv_size; + rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer_data[pos], recv_size); recv_size -= rc; } - const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer); - buffer_free(recv_buffer); + tcp_package_t recv_pkg = tcp_package_from_buffer(recv_buffer); + buffer_destroy(recv_buffer); char uuid_buf[37]; - fprintf(stderr, "connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size); - //for (int32_t i=0;idata.size;i++) { - // printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]); - //} - //printf("\n"); + fprintf(stderr, "connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), recv_buffer_size); return recv_pkg; } -void connection_enqueue_send(esc_connection_t* conn, tcp_package_t* pkg) { - mutex_lock(conn->sending_lock); - mutex_lock(conn->send_lock); - conn->send_pkg = pkg; - mutex_unlock(conn->send_lock); +inline void connection_enqueue_send(esc_connection_t conn, tcp_package_t pkg) { + queue_enqueue(conn->send_queue, pkg); } -tcp_package_t* connection_wait_for(esc_connection_t* conn, esc_uuid_t* correlation_id) { - tcp_package_t* found = 0; +tcp_package_t connection_wait_for(esc_connection_t conn, esc_uuid_t correlation_id) { + tcp_package_t found = 0; while(found == 0) { - mutex_lock(conn->recv_lock); - if (conn->recv_pkg && esc_uuid_compare(&conn->recv_pkg->correlation_id, correlation_id) == 0) { - found = conn->recv_pkg; - conn->recv_pkg = 0; + mutex_lock(conn->recv_peek_lock); + tcp_package_t peek = queue_peek(conn->recv_queue); + if (peek && esc_uuid_compare(peek->correlation_id, correlation_id) == 0) { + queue_dequeue(conn->recv_queue); + found = peek; } - mutex_unlock(conn->recv_lock); - sleep(10); + mutex_unlock(conn->recv_peek_lock); + usleep(1); } return found; } @@ -160,7 +170,7 @@ void protobuf_c_free(void *alloc_data, void* p) { free(p); } -const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { +esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { if (strncmp(addr, "tcp://", 6) != 0) { return 0; } @@ -177,85 +187,70 @@ const esc_connection_t* esc_connection_create(const esc_connection_settings_t* c }; struct st_connection* conn = malloc(sizeof(struct st_connection)); conn->settings = *connection_settings; - conn->discoverer_data = malloc(sizeof(static_endpoint_discoverer_t)); - memcpy(conn->discoverer_data, &discover_data, sizeof(static_endpoint_discoverer_t)); + conn->discoverer_data = malloc(sizeof(struct st_static_endpoint_discoverer)); + memcpy(conn->discoverer_data, &discover_data, sizeof(struct st_static_endpoint_discoverer)); conn->discover = (endpoint_discoverer_t)static_discover; conn->protobuf_c_allocator.alloc = protobuf_c_alloc; conn->protobuf_c_allocator.free = protobuf_c_free; conn->protobuf_c_allocator.allocator_data = 0; if (connection_name == 0 || strcmp(connection_name, "") == 0) { - const esc_uuid_t* uuid = esc_uuid_create(); + esc_uuid_t uuid = esc_uuid_create(); char buf[40]; esc_uuid_format(uuid, buf, 40); conn->name = string_copy(buf); - esc_uuid_free(uuid); + esc_uuid_destroy(uuid); } else { conn->name = connection_name; } - conn->recv_pkg = 0; - conn->send_pkg = 0; - conn->send_lock = mutex_create(); - conn->sending_lock = mutex_create(); - conn->recv_lock = mutex_create(); + conn->send_queue = queue_create(); + conn->recv_peek_lock = mutex_create(); + conn->recv_queue = queue_create(); + conn->stop = 0; return conn; } void* connection_thread(void* arg) { - esc_connection_t* conn = arg; + esc_connection_t conn = arg; - while(1) { - mutex_lock(conn->send_lock); - //printf("%p ", conn->send_pkg); - int rc; - if (conn->send_pkg) { - if ((rc = connection_send_tcp_package(conn, conn->send_pkg)) == 0) { - // free send pkg - conn->send_pkg = 0; - mutex_unlock(conn->sending_lock); - } + while(conn->stop == BOOL_FALSE) { + if (socket_writable(conn->tcp_conn)) { + tcp_package_t send_pkg = queue_dequeue(conn->send_queue); + if (send_pkg && connection_send_tcp_package(conn, send_pkg)) { + fprintf(stderr, "failed to send pkg."); + } } - mutex_unlock(conn->send_lock); - - if ((rc = socket_readable(conn->tcp_conn)) > 0) { - mutex_lock(conn->recv_lock); - if (conn->recv_pkg == 0) { - conn->recv_pkg = connection_recv_tcp_package(conn); - } - mutex_unlock(conn->recv_lock); - } - //printf("%d ", rc); - - mutex_lock(conn->recv_lock); - if (conn->recv_pkg && conn->recv_pkg->command == 0x01) { - tcp_package_t* hb = tcp_package_create(0x02, &conn->recv_pkg->correlation_id, buffer_create(0)); - conn->recv_pkg = 0; - connection_send_tcp_package(conn, hb); - } - mutex_unlock(conn->recv_lock); - //mutex_unlock(conn->mutex_lock); - sleep(10); + if (socket_readable(conn->tcp_conn)) { + tcp_package_t recv_pkg = connection_recv_tcp_package(conn); + if (recv_pkg && recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { + tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); + queue_enqueue(conn->send_queue, heartbeat_pkg); + } else if (recv_pkg) { + queue_enqueue(conn->recv_queue, recv_pkg); + } + } + + usleep(1); } } // return 0 on success // return non-zero on failure and sets last_error on connection -int esc_connection_connect(const esc_connection_t* conn) { +int esc_connection_connect(esc_connection_t conn) { // Discover endpoint const node_endpoints_t* endpoints = conn->discover(conn->discoverer_data, 0); if (endpoints == 0) { return -1; } // Establish Tcp Connection - esc_connection_t* _conn = (esc_connection_t*)conn; - _conn->tcp_conn = socket_create(SOCKET_TYPE_TCP); + conn->tcp_conn = socket_create(SOCKET_TYPE_TCP); tcp_endpoint_t endpoint = conn->settings.use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint; if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) { return -2; } // Start manager thread - _conn->manager_thread = thread_create(connection_thread, _conn); - if (thread_start(_conn->manager_thread)) { + conn->manager_thread = thread_create(connection_thread, conn); + if (thread_start(conn->manager_thread)) { return -3; } // Identify @@ -268,10 +263,11 @@ int esc_connection_connect(const esc_connection_t* conn) { uint8_t buffer[s]; event_store__client__messages__identify_client__pack(&identify_client, buffer); // build tcp_package - const tcp_package_t* send_pkg = tcp_package_create(0xF5, esc_uuid_create(), buffer_from(buffer, s)); - connection_enqueue_send(_conn, send_pkg); - const tcp_package_t* recv_pkg = connection_wait_for(_conn, &send_pkg->correlation_id); - if (recv_pkg->command != 0xF6) { + tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, esc_uuid_create(), buffer_from(buffer, s)); + queue_enqueue(conn->send_queue, send_pkg); + + tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); + if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) { return -5; } @@ -287,7 +283,7 @@ const esc_credentials_t* esc_credentials_create(const char* username, const char const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) { if (msg == 0) return 0; - esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t)); + esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event)); ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len); ev->event_type = string_copy(msg->event_type); ev->event_number = msg->event_number; @@ -300,7 +296,7 @@ const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__ } esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__ResolvedEvent* msg) { - esc_resolved_event_t* ev = malloc(sizeof(esc_resolved_event_t)); + esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event)); ev->original_position.prepare_position = msg->prepare_position; ev->original_position.commit_position = msg->commit_position; ev->event = recorded_event_create(msg->event); @@ -308,7 +304,7 @@ esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__Resolv return ev; } -const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials) { +const esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials) { EventStore__Client__Messages__ReadAllEvents send_msg; event_store__client__messages__read_all_events__init(&send_msg); send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0; @@ -319,20 +315,21 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); uint8_t buffer[s]; event_store__client__messages__read_all_events__pack(&send_msg, buffer); - const tcp_package_t* send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); - connection_enqueue_send(conn, send_pkg); + tcp_package_t send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); + queue_enqueue(conn->send_queue, send_pkg); - const tcp_package_t* recv_pkg = connection_wait_for(conn, &send_pkg->correlation_id); + tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); if (recv_pkg->command != 0xB7) { return 0; } - esc_connection_t* _conn = (esc_connection_t*)conn; + size_t data_size = buffer_size(recv_pkg->data); + uint8_t* data = buffer_data(recv_pkg->data); EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = - event_store__client__messages__read_all_events_completed__unpack(&_conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data); + event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, data_size, data); - esc_all_events_slice_t* result = malloc(sizeof(esc_all_events_slice_t)); + esc_all_events_slice_t* result = malloc(sizeof(struct st_all_events_slice)); result->read_direction = "forward"; result->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0; result->from_position.prepare_position = recv_msg->prepare_position; @@ -340,16 +337,16 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti result->next_position.prepare_position = recv_msg->next_prepare_position; result->next_position.commit_position = recv_msg->next_commit_position; result->n_events = recv_msg->n_events; - result->events = malloc(recv_msg->n_events * sizeof(esc_resolved_event_t*)); + result->events = malloc(recv_msg->n_events * sizeof(void*)); for (size_t i = 0; i < recv_msg->n_events; i++) { result->events[i] = resolved_event_create(recv_msg->events[i]); } - event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &_conn->protobuf_c_allocator); + event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); return result; } -void esc_connection_close(const esc_connection_t* conn) { +void esc_connection_close(esc_connection_t conn) { socket_close(conn->tcp_conn); } diff --git a/src/esc.h b/src/esc.h index f1383d7..83d80ce 100644 --- a/src/esc.h +++ b/src/esc.h @@ -18,7 +18,7 @@ struct st_connection_settings; typedef struct st_connection_settings esc_connection_settings_t; struct st_connection; -typedef struct st_connection esc_connection_t; +typedef struct st_connection* esc_connection_t; struct st_credentials { const char* username; @@ -33,7 +33,7 @@ struct st_esc_position { typedef struct st_esc_position esc_position_t; struct st_recorded_event { - const esc_uuid_t* event_id; + esc_uuid_t event_id; const char* event_type; int64_t event_number; const char *event_stream_id; @@ -63,11 +63,11 @@ typedef struct st_all_events_slice esc_all_events_slice_t; const esc_connection_settings_t* esc_default_connection_settings; // Connection -const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name); -int esc_connection_connect(const esc_connection_t* conn); +esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name); +int esc_connection_connect(esc_connection_t conn); const esc_credentials_t* esc_credentials_create(const char* username, const char* password); -const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials); -void esc_connection_close(const esc_connection_t* conn); +const esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials); +void esc_connection_close(esc_connection_t conn); // Utils const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size); diff --git a/src/main.c b/src/main.c index af1eeb6..0163daa 100644 --- a/src/main.c +++ b/src/main.c @@ -1,7 +1,16 @@ #include +#ifdef __linux__ +#include +#include +#endif #include "utils/mutex.h" #include "esc.h" +#include "utils/queue.h" + +#ifdef _WIN32 +#define usleep Sleep +#endif int main() { #ifdef _WIN32 @@ -9,7 +18,7 @@ int main() { WSAStartup(MAKEWORD(2,0), &wsaData); #endif - const esc_connection_t* conn = esc_connection_create(esc_default_connection_settings, "tcp://127.0.0.1:1113", NULL); + esc_connection_t conn = esc_connection_create(esc_default_connection_settings, "tcp://127.0.0.1:1113", NULL); if (conn == 0) { return -1; } @@ -36,16 +45,12 @@ int main() { result->events[i]->event->event_type, result->events[i]->event->event_number, result->events[i]->event->event_stream_id, - result->events[i]->event->data.size, - result->events[i]->event->metadata.size); + buffer_size(result->events[i]->event->data), + buffer_size(result->events[i]->event->metadata)); } } while(result->is_end_of_stream == 0); -#ifdef _WIN32 - Sleep(30000); -#else - sleep(30000); -#endif + usleep(30000); esc_connection_close(conn); diff --git a/src/tcp_messages.c b/src/tcp_messages.c new file mode 100644 index 0000000..49bf26c --- /dev/null +++ b/src/tcp_messages.c @@ -0,0 +1,72 @@ +// +// Created by nicolas on 20/03/18. +// + +#include "tcp_messages.h" + +char * get_string_for_tcp_message(uint8_t message) { + switch(message) + { + case MESSAGE_HEARTBEATREQUEST : return "Heartbeat Request"; + case MESSAGE_HEARTBEATRESPONSE : return "Heartbeat Response"; + case MESSAGE_PING : return "Ping"; + case MESSAGE_PONG : return "Pong"; + case MESSAGE_PREPAREACK : return "Prepare Acknowledgment"; + case MESSAGE_COMMITACK : return "Commit Acknowledgment"; + case MESSAGE_SLAVEASSIGNMENT : return "Slave Assignment"; + case MESSAGE_CLONEASSIGNMENT : return "Clone Assignment"; + case MESSAGE_SUBSCRIBEREPLICA : return "Subscribe Replica"; + case MESSAGE_REPLICALOGPOSITIONACK : return "Replica Log Position Acknowledgment"; + case MESSAGE_CREATECHUNK : return "Create Chunk"; + case MESSAGE_RAWCHUNKBULK : return "Raw Chunk Bulk"; + case MESSAGE_DATACHUNKBULK : return "Data Chunk Bulk"; + case MESSAGE_REPLICASUBSCRIPTIONRETRY : return "Replica Subscription Retry"; + case MESSAGE_REPLICASUBSCRIBED : return "Replica Subscriber"; + case MESSAGE_WRITEEVENTS : return "Write Events"; + case MESSAGE_WRITEEVENTSCOMPLETED : return "Write Events Completed"; + case MESSAGE_TRANSACTIONSTART : return "Transaction Start"; + case MESSAGE_TRANSACTIONSTARTCOMPLETED : return "Transaction Start Completed"; + case MESSAGE_TRANSACTIONWRITE : return "Transaction Write"; + case MESSAGE_TRANSACTIONWRITECOMPLETED : return "Transaction Write Completed"; + case MESSAGE_TRANSACTIONCOMMIT : return "Transaction Commit"; + case MESSAGE_TRANSACTIONCOMMITCOMPLETED : return "Transaction Commit Completed"; + case MESSAGE_DELETESTREAM : return "Delete Stream"; + case MESSAGE_DELETESTREAMCOMPLETED : return "Delete Stream Completed"; + case MESSAGE_READEVENT : return "Read Event"; + case MESSAGE_READEVENTCOMPLETED : return "Read Event Completed"; + case MESSAGE_READSTREAMEVENTSFORWARD : return "Read Stream Events Forward"; + case MESSAGE_READSTREAMEVENTSFORWARDCOMPLETED : return "Read Stream Events Forward Completed"; + case MESSAGE_READSTREAMEVENTSBACKWARD : return "Read Stream Events Backward"; + case MESSAGE_READSTREAMEVENTSBACKWARDCOMPLETED : return "Read Stream Events Backward Completed"; + case MESSAGE_READALLEVENTSFORWARD : return "Read All Events Forward"; + case MESSAGE_READALLEVENTSFORWARDCOMPLETED : return "Read All Events Forward Completed"; + case MESSAGE_READALLEVENTSBACKWARD : return "Read All Events Backward"; + case MESSAGE_READALLEVENTSBACKWARDCOMPLETED : return "Read All Events Backward Completed"; + case MESSAGE_SUBSCRIBETOSTREAM : return "Subscribe To Stream"; + case MESSAGE_SUBSCRIPTIONCONFIRMATION : return "Subscription Confirmation"; + case MESSAGE_STREAMEVENTAPPEARED : return "Stream Event Appeared"; + case MESSAGE_UNSUBSCRIBEFROMSTREAM : return "Unsubscribe from stream"; + case MESSAGE_SUBSCRIPTIONDROPPED : return "Subscription Dropped"; + case MESSAGE_CONNECTTOPERSISTENTSUBSCRIPTION : return "Connect to Persistent Subscription"; + case MESSAGE_PERSISTENTSUBSCRIPTIONCONFIRMATION : return "Persistent Subscription Confirmation"; + case MESSAGE_PERSISTENTSUBSCRIPTIONSTREAMEVENTAPPEARED : return "Persistent Subscription Stream Event Appeared"; + case MESSAGE_CREATEPERSISTENTSUBSCRIPTION : return "Create Persistent Subscription"; + case MESSAGE_CREATEPERSISTENTSUBSCRIPTIONCOMPLETED : return "Create Persistent Subscription Completed"; + case MESSAGE_DELETEPERSISTENTSUBSCRIPTION : return "Delete Persistent Subscription"; + case MESSAGE_DELETEPERSISTENTSUBSCRIPTIONCOMPLETED : return "Delete Persistent Subscription Completed"; + case MESSAGE_PERSISTENTSUBSCRIPTIONACKEVENTS : return "Persistent Subscription Ack Events"; + case MESSAGE_PERSISTENTSUBSCRIPTIONNAKEVENTS : return "Persistent Subscription Nak Events"; + case MESSAGE_UPDATEPERSISTENTSUBSCRIPTION : return "Update Persistent Subscription"; + case MESSAGE_UPDATEPERSISTENTSUBSCRIPTIONCOMPLETED : return "Update Persistent Subscription Completed"; + case MESSAGE_SCAVENGEDATABASE : return "Scavenge Database"; + case MESSAGE_SCAVENGEDATABASECOMPLETED : return "Scavenge Database Completed"; + case MESSAGE_BADREQUEST : return "Bad Request"; + case MESSAGE_NOTHANDLED : return "Not Handled"; + case MESSAGE_AUTHENTICATE : return "Authenticate"; + case MESSAGE_AUTHENTICATED : return "Authenticated"; + case MESSAGE_NOTAUTHENTICATED : return "Not Authenticated"; + case MESSAGE_IDENTIFYCLIENT : return "Identify Client"; + case MESSAGE_CLIENTIDENTIFIED : return "Client Identified"; + default: return "Unknown Message Code"; + } +} diff --git a/src/tcp_messages.h b/src/tcp_messages.h new file mode 100644 index 0000000..cfe5852 --- /dev/null +++ b/src/tcp_messages.h @@ -0,0 +1,89 @@ +// +// Created by nicolas on 20/03/18. +// + +#ifndef ESC_TCP_MESSAGES_H +#define ESC_TCP_MESSAGES_H + +#include + +#define MESSAGE_HEARTBEATREQUEST 0x01 +#define MESSAGE_HEARTBEATRESPONSE 0x02 + +#define MESSAGE_PING 0x03 +#define MESSAGE_PONG 0x04 + +#define MESSAGE_PREPAREACK 0x05 +#define MESSAGE_COMMITACK 0x06 + +#define MESSAGE_SLAVEASSIGNMENT 0x07 +#define MESSAGE_CLONEASSIGNMENT 0x08 + +#define MESSAGE_SUBSCRIBEREPLICA 0x10 +#define MESSAGE_REPLICALOGPOSITIONACK 0x11 +#define MESSAGE_CREATECHUNK 0x12 +#define MESSAGE_RAWCHUNKBULK 0x13 +#define MESSAGE_DATACHUNKBULK 0x14 +#define MESSAGE_REPLICASUBSCRIPTIONRETRY 0x15 +#define MESSAGE_REPLICASUBSCRIBED 0x16 + + +//Client Messages + +#define MESSAGE_WRITEEVENTS 0x82 +#define MESSAGE_WRITEEVENTSCOMPLETED 0x83 + +#define MESSAGE_TRANSACTIONSTART 0x84 +#define MESSAGE_TRANSACTIONSTARTCOMPLETED 0x85 +#define MESSAGE_TRANSACTIONWRITE 0x86 +#define MESSAGE_TRANSACTIONWRITECOMPLETED 0x87 +#define MESSAGE_TRANSACTIONCOMMIT 0x88 +#define MESSAGE_TRANSACTIONCOMMITCOMPLETED 0x89 + +#define MESSAGE_DELETESTREAM 0x8A +#define MESSAGE_DELETESTREAMCOMPLETED 0x8B + +#define MESSAGE_READEVENT 0xB0 +#define MESSAGE_READEVENTCOMPLETED 0xB1 +#define MESSAGE_READSTREAMEVENTSFORWARD 0xB2 +#define MESSAGE_READSTREAMEVENTSFORWARDCOMPLETED 0xB3 +#define MESSAGE_READSTREAMEVENTSBACKWARD 0xB4 +#define MESSAGE_READSTREAMEVENTSBACKWARDCOMPLETED 0xB5 +#define MESSAGE_READALLEVENTSFORWARD 0xB6 +#define MESSAGE_READALLEVENTSFORWARDCOMPLETED 0xB7 +#define MESSAGE_READALLEVENTSBACKWARD 0xB8 +#define MESSAGE_READALLEVENTSBACKWARDCOMPLETED 0xB9 + +#define MESSAGE_SUBSCRIBETOSTREAM 0xC0 +#define MESSAGE_SUBSCRIPTIONCONFIRMATION 0xC1 +#define MESSAGE_STREAMEVENTAPPEARED 0xC2 +#define MESSAGE_UNSUBSCRIBEFROMSTREAM 0xC3 +#define MESSAGE_SUBSCRIPTIONDROPPED 0xC4 +#define MESSAGE_CONNECTTOPERSISTENTSUBSCRIPTION 0xC5 +#define MESSAGE_PERSISTENTSUBSCRIPTIONCONFIRMATION 0xC6 +#define MESSAGE_PERSISTENTSUBSCRIPTIONSTREAMEVENTAPPEARED 0xC7 +#define MESSAGE_CREATEPERSISTENTSUBSCRIPTION 0xC8 +#define MESSAGE_CREATEPERSISTENTSUBSCRIPTIONCOMPLETED 0xC9 +#define MESSAGE_DELETEPERSISTENTSUBSCRIPTION 0xCA +#define MESSAGE_DELETEPERSISTENTSUBSCRIPTIONCOMPLETED 0xCB +#define MESSAGE_PERSISTENTSUBSCRIPTIONACKEVENTS 0xCC +#define MESSAGE_PERSISTENTSUBSCRIPTIONNAKEVENTS 0xCD +#define MESSAGE_UPDATEPERSISTENTSUBSCRIPTION 0xCE +#define MESSAGE_UPDATEPERSISTENTSUBSCRIPTIONCOMPLETED 0xCF + +#define MESSAGE_SCAVENGEDATABASE 0xD0 +#define MESSAGE_SCAVENGEDATABASECOMPLETED 0xD1 + + + +#define MESSAGE_BADREQUEST 0xF0 +#define MESSAGE_NOTHANDLED 0xF1 +#define MESSAGE_AUTHENTICATE 0xF2 +#define MESSAGE_AUTHENTICATED 0xF3 +#define MESSAGE_NOTAUTHENTICATED 0xF4 +#define MESSAGE_IDENTIFYCLIENT 0xF5 +#define MESSAGE_CLIENTIDENTIFIED 0xF6 + +char * get_string_for_tcp_message(uint8_t message); + +#endif //ESC_TCP_MESSAGES_H diff --git a/src/tcp_package.c b/src/tcp_package.c index c637289..6ca516c 100644 --- a/src/tcp_package.c +++ b/src/tcp_package.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "tcp_package.h" const uint32_t CommandOffset = 0; @@ -13,70 +14,86 @@ const uint32_t CorrelationOffset = 2; const uint32_t AuthOffset = 18; const uint32_t MandatorySize = 18; -const tcp_package_t* tcp_package_create(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data) { - tcp_package_t* pkg = malloc(sizeof(tcp_package_t)); +tcp_package_t tcp_package_create(uint8_t command, esc_uuid_t correlation_id, buffer_t data) { + tcp_package_t pkg = malloc(sizeof(struct st_tcp_package)); pkg->command = command; pkg->flags = 0; - pkg->correlation_id = *correlation_id; + pkg->correlation_id = esc_uuid_copy(correlation_id); pkg->login = 0; pkg->password = 0; pkg->data = data; return pkg; } -const tcp_package_t* tcp_package_create_authenticated(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data, const char* username, const char* password) { - tcp_package_t* pkg = malloc(sizeof(tcp_package_t)); +tcp_package_t tcp_package_create_authenticated(uint8_t command, esc_uuid_t correlation_id, buffer_t data, const char* username, const char* password) { + tcp_package_t pkg = malloc(sizeof(struct st_tcp_package)); pkg->command = command; pkg->flags = 1; - pkg->correlation_id = *correlation_id; + pkg->correlation_id = esc_uuid_copy(correlation_id); pkg->login = username; pkg->password = password; pkg->data = data; return pkg; } -void tcp_package_free(const tcp_package_t* pkg) { - buffer_free(pkg->data); - free((void*)pkg); +void tcp_package_destroy(tcp_package_t pkg) { + esc_uuid_destroy(pkg->correlation_id); + buffer_destroy(pkg->data); + free(pkg); } -buffer_t tcp_package_to_buffer(const tcp_package_t* pkg) { - uint8_t* _dst = malloc(MandatorySize + pkg->data.size + (pkg->flags ? 257*2 : 0)); - _dst[CommandOffset] = pkg->command; - _dst[FlagsOffset] = pkg->flags; - memcpy(&_dst[CorrelationOffset], &pkg->correlation_id, sizeof(esc_uuid_t)); +buffer_t tcp_package_to_buffer(tcp_package_t pkg) { + size_t data_size = buffer_size(pkg->data); + uint8_t* buf = malloc(MandatorySize + data_size + (pkg->flags ? 257*2 : 0)); + buf[CommandOffset] = pkg->command; + buf[FlagsOffset] = pkg->flags; + memcpy(&buf[CorrelationOffset], pkg->correlation_id, 16); size_t size = MandatorySize; + uint8_t* data = buffer_data(pkg->data); if (pkg->flags) { size_t l_len = strlen(pkg->login); - _dst[AuthOffset] = l_len; - strcpy(&_dst[AuthOffset+1], pkg->login); + buf[AuthOffset] = (uint8_t)l_len; + strcpy((char*)&buf[AuthOffset+1], pkg->login); size_t p_len = strlen(pkg->password); - _dst[AuthOffset+1+l_len] = p_len; - strcpy(&_dst[AuthOffset+2+l_len], pkg->password); - memcpy(&_dst[AuthOffset+2+l_len+p_len], pkg->data.data, pkg->data.size); - size += 2 + l_len + p_len + pkg->data.size; + buf[AuthOffset+1+l_len] = (uint8_t)p_len; + strcpy((char*)&buf[AuthOffset+2+l_len], pkg->password); + memcpy(&buf[AuthOffset+2+l_len+p_len], data, data_size); + size += 2 + l_len + p_len + data_size; } else { - memcpy(&_dst[AuthOffset], pkg->data.data, pkg->data.size); - size += pkg->data.size; + memcpy(&buf[AuthOffset], data, data_size); + size += data_size; } - return buffer_from(_dst, size); + /* + for (size_t i = 0; i < size; i++) { + fprintf(stderr, "%02hhx ", buf[i]); + } + fprintf(stderr, "\n"); + */ + return buffer_from(buf, size); } -const tcp_package_t* tcp_package_from_buffer(buffer_t buffer) { - tcp_package_t* pkg = malloc(sizeof(tcp_package_t)); - pkg->command = buffer.data[CommandOffset]; - pkg->flags = buffer.data[FlagsOffset]; - memcpy(&pkg->correlation_id, &buffer.data[CorrelationOffset], sizeof(esc_uuid_t)); +tcp_package_t tcp_package_from_buffer(buffer_t buffer) { + size_t buf_size = buffer_size(buffer); + uint8_t* buf_data = buffer_data(buffer); + /* + for (size_t i = 0; i < buffer_size(buffer); i++) { + fprintf(stderr, "%02hhx ", buf_data[i]); + } + fprintf(stderr, "\n"); + */ + tcp_package_t pkg = malloc(sizeof(struct st_tcp_package)); + pkg->command = buf_data[CommandOffset]; + pkg->flags = buf_data[FlagsOffset]; + pkg->correlation_id = esc_uuid_from(&buf_data[CorrelationOffset], 16); if (pkg->flags) { - size_t l_len = buffer.data[AuthOffset]; - size_t p_len = buffer.data[AuthOffset+1+l_len]; - pkg->data = buffer_create(buffer.size - MandatorySize - 2 - l_len - p_len); - memcpy(pkg->data.data, &buffer.data[MandatorySize + 2 + l_len + p_len], buffer.size - MandatorySize - 2 - l_len - p_len); + size_t l_len = buf_data[AuthOffset]; + size_t p_len = buf_data[AuthOffset+1+l_len]; + size_t pos = MandatorySize - 2 - l_len - p_len; + pkg->data = buffer_copyfrom(&buf_data[pos], buf_size - pos); } else { pkg->login = 0; pkg->password = 0; - pkg->data = buffer_create(buffer.size - MandatorySize); - memcpy(pkg->data.data, &buffer.data[MandatorySize], buffer.size - MandatorySize); + pkg->data = buffer_copyfrom(&buf_data[MandatorySize], buf_size - MandatorySize); } return pkg; } diff --git a/src/tcp_package.h b/src/tcp_package.h index 1f8b977..eaf645b 100644 --- a/src/tcp_package.h +++ b/src/tcp_package.h @@ -16,12 +16,12 @@ struct st_tcp_package { const char* password; buffer_t data; }; -typedef struct st_tcp_package tcp_package_t; +typedef struct st_tcp_package* tcp_package_t; -const tcp_package_t* tcp_package_create(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data); -const tcp_package_t* tcp_package_create_authenticated(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data, const char* username, const char* password); -void tcp_package_free(const tcp_package_t* pkg); -buffer_t tcp_package_to_buffer(const tcp_package_t* pkg); -const tcp_package_t* tcp_package_from_buffer(buffer_t buffer); +tcp_package_t tcp_package_create(uint8_t command, esc_uuid_t correlation_id, buffer_t data); +tcp_package_t tcp_package_create_authenticated(uint8_t command, esc_uuid_t correlation_id, buffer_t data, const char* username, const char* password); +void tcp_package_destroy(tcp_package_t pkg); +buffer_t tcp_package_to_buffer(tcp_package_t pkg); +tcp_package_t tcp_package_from_buffer(buffer_t buffer); #endif //ESC_TCP_PACKAGE_H diff --git a/src/utils/buffer.c b/src/utils/buffer.c index a920e2e..388588c 100644 --- a/src/utils/buffer.c +++ b/src/utils/buffer.c @@ -6,22 +6,48 @@ #include #include "buffer.h" -const buffer_t buffer_create(size_t size) { - buffer_t buf = {size, malloc(size)}; +struct st_buffer { + size_t size; + uint8_t* data; + uint8_t own_data; +}; + +buffer_t buffer_create(size_t size) { + buffer_t buf = malloc(sizeof(struct st_buffer)); + buf->size = size; + buf->data = malloc(size); + buf->own_data = 1; return buf; } -const buffer_t buffer_from(uint8_t* data, size_t size) { - buffer_t buf = {size, data}; +inline void buffer_destroy(buffer_t buffer) { + if (buffer->own_data) { + free(buffer->data); + } + free(buffer); +} + +buffer_t buffer_from(const uint8_t* data, size_t size) { + buffer_t buf = malloc(sizeof(struct st_buffer)); + buf->size = size; + buf->data = (uint8_t*)data; + buf->own_data = 0; return buf; } -const buffer_t buffer_copy(const buffer_t* other) { - buffer_t buf = {other->size, malloc(other->size)}; - memcpy(buf.data, other->data, other->size); +buffer_t buffer_copyfrom(const uint8_t* data, size_t size) { + buffer_t buf = malloc(sizeof(struct st_buffer)); + buf->size = size; + buf->data = malloc(size); + memcpy(buf->data, data, size); + buf->own_data = 1; return buf; } -void buffer_free(buffer_t buffer) { - free(buffer.data); +inline size_t buffer_size(buffer_t buffer) { + return buffer->size; } + +inline uint8_t* buffer_data(buffer_t buffer) { + return buffer->data; +} \ No newline at end of file diff --git a/src/utils/buffer.h b/src/utils/buffer.h index df0b4af..2f3c3e8 100644 --- a/src/utils/buffer.h +++ b/src/utils/buffer.h @@ -8,15 +8,13 @@ #include #include -struct st_buffer { - size_t size; - uint8_t* data; -}; -typedef struct st_buffer buffer_t; +typedef struct st_buffer* buffer_t; -const buffer_t buffer_create(size_t size); -const buffer_t buffer_from(uint8_t* data, size_t size); -const buffer_t buffer_copy(const buffer_t* other); -void buffer_free(buffer_t buffer); +buffer_t buffer_create(size_t size); +void buffer_destroy(buffer_t buffer); +buffer_t buffer_from(const uint8_t* data, size_t size); +buffer_t buffer_copyfrom(const uint8_t* data, size_t size); +size_t buffer_size(buffer_t buffer); +uint8_t* buffer_data(buffer_t buffer); #endif //ESC_BUFFER_H diff --git a/src/utils/mutex.c b/src/utils/mutex.c index 20409e5..7801dcc 100644 --- a/src/utils/mutex.c +++ b/src/utils/mutex.c @@ -29,22 +29,28 @@ void mutex_destroy(mutex_t mutex) { } #endif #ifdef __linux__ +#include + +struct st_mutex { + pthread_mutex_t handle; +}; + mutex_t mutex_create() { - mutex_t mutex = malloc(sizeof(mutex_t)); - pthread_mutex_init(mutex, 0); + mutex_t mutex = malloc(sizeof(struct st_mutex)); + pthread_mutex_init(&mutex->handle, 0); return mutex; } void mutex_lock(mutex_t mutex) { - pthread_mutex_lock(mutex); + pthread_mutex_lock(&mutex->handle); } void mutex_unlock(mutex_t mutex) { - pthread_mutex_unlock(mutex); + pthread_mutex_unlock(&mutex->handle); } void mutex_destroy(mutex_t mutex) { - pthread_mutex_destroy(mutex); + pthread_mutex_destroy(&mutex->handle); free(mutex); } #endif \ No newline at end of file diff --git a/src/utils/mutex.h b/src/utils/mutex.h index e177658..70642d7 100644 --- a/src/utils/mutex.h +++ b/src/utils/mutex.h @@ -5,14 +5,7 @@ #ifndef ESC_MUTEX_H #define ESC_MUTEX_H -#ifdef _WIN32 -#include -typedef LPCRITICAL_SECTION mutex_t; -#endif -#ifdef __linux__ -#include -typedef pthread_mutex_t* mutex_t; -#endif +typedef struct st_mutex* mutex_t; mutex_t mutex_create(); void mutex_lock(mutex_t mutex); diff --git a/src/utils/queue.c b/src/utils/queue.c new file mode 100644 index 0000000..2943ed7 --- /dev/null +++ b/src/utils/queue.c @@ -0,0 +1,89 @@ +// +// Created by nicolas on 20/03/18. +// + +#include +#include +#include +#include "queue.h" +#include "mutex.h" + +struct st_node { + void* item; + struct st_node* next; +}; +struct st_queue { + mutex_t lock; + size_t size; + struct st_node* start; + struct st_node* end; +}; + +queue_t queue_create() { + queue_t q = malloc(sizeof(struct st_queue)); + assert(q != 0); + q->lock = mutex_create(); + q->size = 0; + q->start = 0; + q->end = 0; + return q; +} + +void queue_enqueue(queue_t q, void* item) { + struct st_node* node = malloc(sizeof(struct st_node)); + assert(node != 0); + node->item = item; + node->next = 0; + + mutex_lock(q->lock); + if (q->end) { + q->end->next = node; + } + if (q->start == 0) { + q->start = node; + } + q->end = node; + q->size++; + mutex_unlock(q->lock); +} + +void* queue_dequeue(queue_t q) { + mutex_lock(q->lock); + if (q->start == 0) { + mutex_unlock(q->lock); + return 0; + } + struct st_node* node = q->start; + void* item = node->item; + q->start = node->next; + q->size--; + mutex_unlock(q->lock); + free(node); + return item; +} + +void* queue_peek(queue_t q) { + mutex_lock(q->lock); + void* item = q->start ? q->start->item : 0; + mutex_unlock(q->lock); + return item; +} + +inline size_t queue_size(queue_t q) { + mutex_lock(q->lock); + size_t size = q->size; + mutex_unlock(q->lock); + return size; +} + +void queue_destroy(queue_t q) { + mutex_lock(q->lock); + struct st_node* p = q->start; + while(p) { + struct st_node* next = p->next; + free(p); + p = next; + } + mutex_unlock(q->lock); + free(q); +} diff --git a/src/utils/queue.h b/src/utils/queue.h new file mode 100644 index 0000000..30413bf --- /dev/null +++ b/src/utils/queue.h @@ -0,0 +1,17 @@ +// +// Created by nicolas on 20/03/18. +// + +#ifndef ESC_QUEUE_H +#define ESC_QUEUE_H + +typedef struct st_queue* queue_t; + +queue_t queue_create(); +void queue_enqueue(queue_t q, void* item); +void* queue_dequeue(queue_t q); +void* queue_peek(queue_t q); +size_t queue_size(queue_t q); +void queue_destroy(queue_t q); + +#endif //ESC_QUEUE_H diff --git a/src/utils/socket.c b/src/utils/socket.c index 5e0982d..9fc2af1 100644 --- a/src/utils/socket.c +++ b/src/utils/socket.c @@ -78,10 +78,11 @@ int socket_error() { #include struct st_socket { - int s; int af; int type; int proto; + int s; + int last_error; }; socket_t socket_create(int type) { @@ -92,16 +93,24 @@ socket_t socket_create(int type) { s->proto = IPPROTO_TCP; s->s = socket(s->af, s->type, s->proto); if (s->s == -1) { - free(s); - return NULL; + s->last_error = errno; } return s; } return NULL; } -void socket_close(socket_t s) { - close(s->s); +inline void socket_destroy(socket_t s) { + socket_close(s); + free(s); +} + +int socket_close(socket_t s) { + if (close(s->s) == -1) { + s->last_error = errno; + return -1; + } + return 0; } int socket_connect(socket_t s, char* addr, unsigned short port) { @@ -113,21 +122,61 @@ int socket_connect(socket_t s, char* addr, unsigned short port) { struct addrinfo* result; char port_s[6]; snprintf(port_s, 6, "%d", port); - if (getaddrinfo(addr, port_s, &hints, &result) != 0) { + int rc; + if ((rc = getaddrinfo(addr, port_s, &hints, &result)) != 0) { + s->last_error = rc; return -1; } - return connect(s->s, result->ai_addr, (int)result->ai_addrlen); + if (connect(s->s, result->ai_addr, (int)result->ai_addrlen) != 0) { + s->last_error = errno; + return -1; + } + return 0; } ssize_t socket_send(socket_t s, char* data, size_t len) { - return send(s->s, data, len, 0); + ssize_t rc = send(s->s, data, len, 0); + if (rc < 0) { + s->last_error = errno; + } + return rc; } ssize_t socket_recv(socket_t s, char* buf, size_t len) { - return recv(s->s, buf, len, 0); + ssize_t rc = recv(s->s, buf, len, 0); + if (rc < 0) { + s->last_error = errno; + } + return rc; } -int socket_error() { - return errno; +int socket_writable(socket_t s) { + fd_set writable; + FD_ZERO(&writable); + FD_SET(s->s, &writable); + struct timeval timeout = {0,0}; + + int rc = select(s->s + 1, 0, &writable, 0, &timeout); + if (rc < 0) { + s->last_error = errno; + } + return rc; +} + +int socket_readable(socket_t s) { + fd_set readable; + FD_ZERO(&readable); + FD_SET(s->s, &readable); + struct timeval timeout = {0,0}; + + int rc = select(s->s + 1, &readable, 0, 0, &timeout); + if (rc < 0) { + s->last_error = errno; + } + return rc; +} + +inline int socket_error(socket_t s) { + return s->last_error; } #endif \ No newline at end of file diff --git a/src/utils/socket.h b/src/utils/socket.h index 36e52aa..55aa59e 100644 --- a/src/utils/socket.h +++ b/src/utils/socket.h @@ -14,11 +14,13 @@ enum { }; socket_t socket_create(int type); -void socket_close(socket_t s); +void socket_destroy(socket_t s); +int socket_close(socket_t s); int socket_connect(socket_t s, char* addr, unsigned short port); ssize_t socket_send(socket_t s, char* data, size_t len); ssize_t socket_recv(socket_t s, char* buf, size_t len); +int socket_writable(socket_t s); int socket_readable(socket_t s); -int socket_error(); +int socket_error(socket_t s); #endif //ESC_SOCKET_H diff --git a/src/utils/thread.c b/src/utils/thread.c index 935acdc..61d59dc 100644 --- a/src/utils/thread.c +++ b/src/utils/thread.c @@ -63,6 +63,8 @@ struct st_thread { thread_start_t start; thread_arg_t arg; int last_error; + void* result; + int joined; }; thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { @@ -71,29 +73,55 @@ thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { thread->start = thread_start; thread->arg = thread_arg; thread->last_error = 0; + thread->result = 0; + thread->joined = 0; return thread; } int thread_start(thread_t thread) { int rc = pthread_create(&thread->handle, NULL, thread->start, thread->arg); - if (rc != 0) thread->last_error = rc; - return rc; + if (rc != 0) { + thread->last_error = rc; + return -1; + } + return 0; } int thread_wait(thread_t thread) { + if (thread->joined) { + return 0; + } void* result; int rc = pthread_join(thread->handle, &result); - if (rc != 0) thread->last_error = rc; - return rc; + if (rc != 0) { + thread->last_error = rc; + return -1; + } + thread->joined = 1; + return 0; } int thread_destroy(thread_t thread) { - if (thread_wait(thread)) return pthread_cancel(thread->handle); + if (thread_wait(thread)) { + int rc = pthread_cancel(thread->handle); + if (rc != 0) { + thread->last_error = rc; + return -1; + } + } free(thread); return 0; } -int thread_error(thread_t thread) { +inline int thread_error(thread_t thread) { return thread->last_error; } -#endif \ No newline at end of file + +inline int thread_result(thread_t thread, void** result) { + if (thread_wait(thread)) { + return -1; + } + *result = thread->result; + return 0; +} +#endif diff --git a/src/utils/thread.h b/src/utils/thread.h index f115929..d07e9ed 100644 --- a/src/utils/thread.h +++ b/src/utils/thread.h @@ -14,6 +14,6 @@ int thread_start(thread_t thread); int thread_wait(thread_t thread); int thread_destroy(thread_t thread); int thread_error(thread_t thread); -void* thread_result(thread_t thread); +int thread_result(thread_t thread, void** result); #endif //ESC_THREAD_H diff --git a/src/utils/uuid.c b/src/utils/uuid.c index dab003f..8aad106 100644 --- a/src/utils/uuid.c +++ b/src/utils/uuid.c @@ -10,39 +10,42 @@ #define UUID_SIZE 16 +struct st_uuid { + char data[UUID_SIZE]; +}; + //TODO: fixme - this is not secure, use crypto random bytes instead of rand -const esc_uuid_t* esc_uuid_create() { - esc_uuid_t* result = malloc(sizeof(esc_uuid_t)); +esc_uuid_t esc_uuid_create() { + esc_uuid_t result = malloc(sizeof(struct st_uuid)); srand(time(NULL)); for(int i=0;idata[i] = (uint8_t)(rand() & 0xff); return result; } -void esc_uuid_free(const esc_uuid_t* uuid) { - free((void*)uuid); +inline void esc_uuid_destroy(esc_uuid_t uuid) { + free(uuid); } -const esc_uuid_t* esc_uuid_from(uint8_t* src, size_t size) { - if (size != UUID_SIZE) { +esc_uuid_t esc_uuid_from(const uint8_t* src, size_t size) { + if (size < UUID_SIZE) { return 0; } - esc_uuid_t* uuid = malloc(sizeof(esc_uuid_t)); - for (size_t i=0; idata[i] = src[i]; + esc_uuid_t uuid = malloc(sizeof(struct st_uuid)); + memcpy(uuid->data, src, UUID_SIZE); return uuid; } -const esc_uuid_t* esc_uuid_copy(const esc_uuid_t* other) { - return esc_uuid_from((uint8_t*)&other->data, UUID_SIZE); +esc_uuid_t esc_uuid_copy(esc_uuid_t other) { + return esc_uuid_from((uint8_t*)other->data, UUID_SIZE); } -int esc_uuid_compare(const esc_uuid_t* left, const esc_uuid_t* right) { +int esc_uuid_compare(esc_uuid_t left, esc_uuid_t right) { return memcmp(left->data, right->data, UUID_SIZE); } -const char* esc_uuid_format(const esc_uuid_t* uuid, char* buffer, size_t buf_size) { - snprintf(buffer, buf_size, "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x", +const char* esc_uuid_format(esc_uuid_t uuid, char* buffer, size_t buf_size) { + snprintf(buffer, buf_size, "%02hhx%02hhx%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx", uuid->data[0], uuid->data[1], uuid->data[2], diff --git a/src/utils/uuid.h b/src/utils/uuid.h index 9c737ac..8fcaf87 100644 --- a/src/utils/uuid.h +++ b/src/utils/uuid.h @@ -7,16 +7,13 @@ #include -struct st_uuid { - uint8_t data[16]; -}; -typedef struct st_uuid esc_uuid_t; +typedef struct st_uuid* esc_uuid_t; -const esc_uuid_t* esc_uuid_create(); -void esc_uuid_free(const esc_uuid_t* uuid); -const esc_uuid_t* esc_uuid_from(uint8_t* src, size_t size); -const esc_uuid_t* esc_uuid_copy(const esc_uuid_t* other); -int esc_uuid_compare(const esc_uuid_t* left, const esc_uuid_t* right); -const char* esc_uuid_format(const esc_uuid_t* uuid, char* buffer, size_t buf_size); +esc_uuid_t esc_uuid_create(); +void esc_uuid_destroy(esc_uuid_t uuid); +esc_uuid_t esc_uuid_from(const uint8_t* src, size_t size); +esc_uuid_t esc_uuid_copy(esc_uuid_t other); +int esc_uuid_compare(esc_uuid_t left, esc_uuid_t right); +const char* esc_uuid_format(esc_uuid_t uuid, char* buffer, size_t len); #endif //ESC_UUID_H