From 1f2958f0adbb1e5c75471c830e81dc105741fa1c Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Mon, 19 Mar 2018 22:48:55 -0700 Subject: [PATCH] async send/recv (wip) --- CMakeLists.txt | 4 +- src/esc.c | 124 ++++++++++++++++++++++++++++++--------------- src/main.c | 8 ++- src/utils/buffer.c | 6 +-- src/utils/buffer.h | 2 +- src/utils/socket.c | 29 +++++++---- src/utils/socket.h | 1 + src/utils/string.c | 13 +++++ src/utils/string.h | 10 ++++ src/utils/thread.c | 38 +++++++++++--- src/utils/thread.h | 10 ---- src/utils/uuid.c | 19 ++++++- src/utils/uuid.h | 3 ++ 13 files changed, 190 insertions(+), 77 deletions(-) create mode 100644 src/utils/string.c create mode 100644 src/utils/string.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 7adbda0..ab97920 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.10) +cmake_minimum_required(VERSION 3.9) project(esc C) set(CMAKE_C_STANDARD 99) @@ -8,7 +8,7 @@ if(WIN32) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) endif() -add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h) +add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h) if(WIN32) target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a) diff --git a/src/esc.c b/src/esc.c index f91fbbf..cb54fb7 100644 --- a/src/esc.c +++ b/src/esc.c @@ -14,12 +14,11 @@ #include "tcp_package.h" #include "utils/thread.h" #include "utils/mutex.h" +#include "utils/string.h" -const char* string_copy(const char *src) { - char* dst = malloc(strlen(src)+1); - strcpy(dst, src); - return dst; -} +#ifdef _WIN32 +#define sleep Sleep +#endif typedef int bool_t; const bool_t BOOL_TRUE = 1; @@ -68,7 +67,11 @@ struct st_connection { socket_t tcp_conn; ProtobufCAllocator protobuf_c_allocator; thread_t manager_thread; - mutex_t mutex_lock; + mutex_t sending_lock; + mutex_t send_lock; + tcp_package_t* send_pkg; + mutex_t recv_lock; + tcp_package_t* recv_pkg; }; struct st_static_endpoint_discoverer { @@ -89,7 +92,7 @@ const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* disc int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) { char uuid_buf[37]; - printf("connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size); + fprintf(stderr, "connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size); buffer_t send_buffer = tcp_package_to_buffer(pkg); uint32_t size = (uint32_t)send_buffer.size; if (socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t)) <= 0) { @@ -108,7 +111,7 @@ const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) { uint32_t recv_size; ssize_t rc; if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { - printf("%ld %d", rc, socket_error()); + fprintf(stderr, "%ld %d", rc, socket_error()); return 0; } buffer_t recv_buffer = buffer_create(recv_size); @@ -120,7 +123,7 @@ const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) { const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer); buffer_free(recv_buffer); char uuid_buf[37]; - printf("connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size); + fprintf(stderr, "connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size); //for (int32_t i=0;idata.size;i++) { // printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]); //} @@ -128,6 +131,27 @@ const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) { return recv_pkg; } +void connection_enqueue_send(esc_connection_t* conn, tcp_package_t* pkg) { + mutex_lock(conn->sending_lock); + mutex_lock(conn->send_lock); + conn->send_pkg = pkg; + mutex_unlock(conn->send_lock); +} + +tcp_package_t* connection_wait_for(esc_connection_t* conn, esc_uuid_t* correlation_id) { + tcp_package_t* found = 0; + while(found == 0) { + mutex_lock(conn->recv_lock); + if (conn->recv_pkg && esc_uuid_compare(&conn->recv_pkg->correlation_id, correlation_id) == 0) { + found = conn->recv_pkg; + conn->recv_pkg = 0; + } + mutex_unlock(conn->recv_lock); + sleep(10); + } + return found; +} + void* protobuf_c_alloc(void *alloc_data, size_t size) { return malloc(size); } @@ -164,24 +188,53 @@ const esc_connection_t* esc_connection_create(const esc_connection_settings_t* c char buf[40]; esc_uuid_format(uuid, buf, 40); conn->name = string_copy(buf); - free(uuid); + esc_uuid_free(uuid); } else { conn->name = connection_name; } + conn->recv_pkg = 0; + conn->send_pkg = 0; + conn->send_lock = mutex_create(); + conn->sending_lock = mutex_create(); + conn->recv_lock = mutex_create(); return conn; } void* connection_thread(void* arg) { - const esc_connection_t* conn = arg; + esc_connection_t* conn = arg; while(1) { - //mutex_lock(conn->mutex_lock); - //if (socket_readable(conn->tcp_conn)) { - - //} + mutex_lock(conn->send_lock); + //printf("%p ", conn->send_pkg); + int rc; + if (conn->send_pkg) { + if ((rc = connection_send_tcp_package(conn, conn->send_pkg)) == 0) { + // free send pkg + conn->send_pkg = 0; + mutex_unlock(conn->sending_lock); + } + } + mutex_unlock(conn->send_lock); + + if ((rc = socket_readable(conn->tcp_conn)) > 0) { + mutex_lock(conn->recv_lock); + if (conn->recv_pkg == 0) { + conn->recv_pkg = connection_recv_tcp_package(conn); + } + mutex_unlock(conn->recv_lock); + } + //printf("%d ", rc); + + mutex_lock(conn->recv_lock); + if (conn->recv_pkg && conn->recv_pkg->command == 0x01) { + tcp_package_t* hb = tcp_package_create(0x02, &conn->recv_pkg->correlation_id, buffer_create(0)); + conn->recv_pkg = 0; + connection_send_tcp_package(conn, hb); + } + mutex_unlock(conn->recv_lock); //mutex_unlock(conn->mutex_lock); - sleep(1); + sleep(10); } } @@ -200,7 +253,12 @@ int esc_connection_connect(const esc_connection_t* conn) { if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) { return -2; } - // Identify + // Start manager thread + _conn->manager_thread = thread_create(connection_thread, _conn); + if (thread_start(_conn->manager_thread)) { + return -3; + } + // Identify // build message EventStore__Client__Messages__IdentifyClient identify_client; event_store__client__messages__identify_client__init(&identify_client); @@ -211,24 +269,13 @@ int esc_connection_connect(const esc_connection_t* conn) { event_store__client__messages__identify_client__pack(&identify_client, buffer); // build tcp_package const tcp_package_t* send_pkg = tcp_package_create(0xF5, esc_uuid_create(), buffer_from(buffer, s)); - if (connection_send_tcp_package(conn, send_pkg) != 0) { - return -3; - } - const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn); - if (recv_pkg == 0) { - return -4; - } + connection_enqueue_send(_conn, send_pkg); + const tcp_package_t* recv_pkg = connection_wait_for(_conn, &send_pkg->correlation_id); if (recv_pkg->command != 0xF6) { return -5; } - - _conn->mutex_lock = mutex_create(); - _conn->manager_thread = thread_create(connection_thread, _conn); - if (thread_start(_conn->manager_thread)) { - return -6; - } - - return 0; + + return 0; } const esc_credentials_t* esc_credentials_create(const char* username, const char* password) { @@ -246,8 +293,8 @@ const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__ ev->event_number = msg->event_number; ev->event_stream_id = string_copy(msg->event_stream_id); ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0; - ev->data = buffer_copyfrom(msg->data.data, msg->data.len); - ev->metadata = buffer_copyfrom(msg->metadata.data, msg->metadata.len); + ev->data = buffer_from(msg->data.data, msg->data.len); + ev->metadata = buffer_from(msg->metadata.data, msg->metadata.len); return ev; } @@ -273,14 +320,9 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti uint8_t buffer[s]; event_store__client__messages__read_all_events__pack(&send_msg, buffer); const tcp_package_t* send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); - if (connection_send_tcp_package(conn, send_pkg) != 0) { - return 0; - } + connection_enqueue_send(conn, send_pkg); - const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn); - if (recv_pkg == 0) { - return 0; - } + const tcp_package_t* recv_pkg = connection_wait_for(conn, &send_pkg->correlation_id); if (recv_pkg->command != 0xB7) { return 0; } diff --git a/src/main.c b/src/main.c index 664bfb7..af1eeb6 100644 --- a/src/main.c +++ b/src/main.c @@ -9,7 +9,7 @@ int main() { WSAStartup(MAKEWORD(2,0), &wsaData); #endif - const esc_connection_t* conn = esc_connection_create(esc_default_connection_settings, "tcp://127.0.0.1:1113", NULL); + const esc_connection_t* conn = esc_connection_create(esc_default_connection_settings, "tcp://127.0.0.1:1113", NULL); if (conn == 0) { return -1; } @@ -40,6 +40,12 @@ int main() { result->events[i]->event->metadata.size); } } while(result->is_end_of_stream == 0); + +#ifdef _WIN32 + Sleep(30000); +#else + sleep(30000); +#endif esc_connection_close(conn); diff --git a/src/utils/buffer.c b/src/utils/buffer.c index 1d4a831..a920e2e 100644 --- a/src/utils/buffer.c +++ b/src/utils/buffer.c @@ -16,9 +16,9 @@ const buffer_t buffer_from(uint8_t* data, size_t size) { return buf; } -const buffer_t buffer_copyfrom(uint8_t* data, size_t size) { - buffer_t buf = {size, malloc(size)}; - memcpy(buf.data, data, size); +const buffer_t buffer_copy(const buffer_t* other) { + buffer_t buf = {other->size, malloc(other->size)}; + memcpy(buf.data, other->data, other->size); return buf; } diff --git a/src/utils/buffer.h b/src/utils/buffer.h index 3f0b044..df0b4af 100644 --- a/src/utils/buffer.h +++ b/src/utils/buffer.h @@ -16,7 +16,7 @@ typedef struct st_buffer buffer_t; const buffer_t buffer_create(size_t size); const buffer_t buffer_from(uint8_t* data, size_t size); -const buffer_t buffer_copyfrom(uint8_t* data, size_t size); +const buffer_t buffer_copy(const buffer_t* other); void buffer_free(buffer_t buffer); #endif //ESC_BUFFER_H diff --git a/src/utils/socket.c b/src/utils/socket.c index a72f213..5e0982d 100644 --- a/src/utils/socket.c +++ b/src/utils/socket.c @@ -3,13 +3,7 @@ // #include -#include -#include #include -#include -#include -#include -#include #include "socket.h" #ifdef _WIN32 @@ -55,12 +49,20 @@ int socket_connect(socket_t s, char* addr, unsigned short port) { return connect(s->s, result->ai_addr, (int)result->ai_addrlen); } -int socket_send(socket_t s, char* data, int len) { - return send(s->s, data, len, 0); +ssize_t socket_send(socket_t s, char* data, size_t len) { + return send(s->s, data, (int)len, 0); } -int socket_recv(socket_t s, char* buf, int len) { - return recv(s->s, buf, len, 0); +ssize_t socket_recv(socket_t s, char* buf, size_t len) { + return recv(s->s, buf, (int)len, 0); +} + +int socket_readable(socket_t s) { + fd_set readable; + FD_SET(s->s, &readable); + struct timeval timeout = {0, 0}; + + return select(0, &readable, 0, 0, &timeout); } int socket_error() { @@ -68,6 +70,13 @@ int socket_error() { } #endif #ifdef __linux__ +#include +#include +#include +#include +#include +#include + struct st_socket { int s; int af; diff --git a/src/utils/socket.h b/src/utils/socket.h index 7f0c9f0..36e52aa 100644 --- a/src/utils/socket.h +++ b/src/utils/socket.h @@ -18,6 +18,7 @@ void socket_close(socket_t s); int socket_connect(socket_t s, char* addr, unsigned short port); ssize_t socket_send(socket_t s, char* data, size_t len); ssize_t socket_recv(socket_t s, char* buf, size_t len); +int socket_readable(socket_t s); int socket_error(); #endif //ESC_SOCKET_H diff --git a/src/utils/string.c b/src/utils/string.c new file mode 100644 index 0000000..7652a50 --- /dev/null +++ b/src/utils/string.c @@ -0,0 +1,13 @@ +// +// Created by nicol on 2018-03-19. +// + +#include +#include +#include "string.h" + +const char* string_copy(const char *src) { + char* dst = malloc(strlen(src)+1); + strcpy(dst, src); + return dst; +} diff --git a/src/utils/string.h b/src/utils/string.h new file mode 100644 index 0000000..141e61a --- /dev/null +++ b/src/utils/string.h @@ -0,0 +1,10 @@ +// +// Created by nicol on 2018-03-19. +// + +#ifndef ESC_STRING_H +#define ESC_STRING_H + +const char* string_copy(const char *src); + +#endif //ESC_STRING_H diff --git a/src/utils/thread.c b/src/utils/thread.c index 8f799f5..935acdc 100644 --- a/src/utils/thread.c +++ b/src/utils/thread.c @@ -6,30 +6,54 @@ #include "thread.h" #ifdef _WIN32 +#include + +struct st_thread { + DWORD id; + HANDLE handle; + thread_start_t start; + thread_arg_t arg; + int last_error; +}; + thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) { thread_t thread = malloc(sizeof(struct st_thread)); - thread->handle = CreateThread(NULL, 0, thread_start, thread_arg, CREATE_SUSPENDED, &thread->id); - if (thread->handle == NULL) { - free(thread); - return NULL; - } + thread->handle = 0; + thread->start = thread_start; + thread->arg = thread_arg; + thread->last_error = 0; return thread; } int thread_start(thread_t thread) { - if (ResumeThread(thread->handle) == -1) return -1; + thread->handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)thread->start, thread->arg, 0, &thread->id); + if (thread->handle == NULL) { + DWORD rc = GetLastError(); + thread->last_error = (int)rc; + return (int)rc; + } return 0; } int thread_wait(thread_t thread) { - if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) return -1; + if (WaitForSingleObject(thread->handle, INFINITE) == WAIT_FAILED) { + DWORD rc = GetLastError(); + thread->last_error = (int)rc; + return (int)rc; + } return 0; } int thread_destroy(thread_t thread) { if (thread_wait(thread) != 0) SuspendThread(thread->handle); CloseHandle(thread->handle); + free(thread); } + +int thread_error(thread_t thread) { + return thread->last_error; +} + #endif #ifdef __linux__ #include diff --git a/src/utils/thread.h b/src/utils/thread.h index bb92c0a..f115929 100644 --- a/src/utils/thread.h +++ b/src/utils/thread.h @@ -5,16 +5,6 @@ #ifndef ESC_THREAD_H #define ESC_THREAD_H -#ifdef _WIN32 -#include -struct st_thread { - DWORD id; - HANDLE handle; -}; -typedef struct st_thread* thread_t; -typedef LPTHREAD_START_ROUTINE thread_start_t; -typedef LPVOID thread_arg_t; -#endif typedef void* (*thread_start_t)(void*); typedef void* thread_arg_t; typedef struct st_thread* thread_t; diff --git a/src/utils/uuid.c b/src/utils/uuid.c index 4f33c35..dab003f 100644 --- a/src/utils/uuid.c +++ b/src/utils/uuid.c @@ -5,19 +5,26 @@ #include #include #include +#include #include "uuid.h" +#define UUID_SIZE 16 + //TODO: fixme - this is not secure, use crypto random bytes instead of rand const esc_uuid_t* esc_uuid_create() { esc_uuid_t* result = malloc(sizeof(esc_uuid_t)); srand(time(NULL)); - for(int i=0;i<16;i++) + for(int i=0;idata[i] = (uint8_t)(rand() & 0xff); return result; } +void esc_uuid_free(const esc_uuid_t* uuid) { + free((void*)uuid); +} + const esc_uuid_t* esc_uuid_from(uint8_t* src, size_t size) { - if (size != 16) { + if (size != UUID_SIZE) { return 0; } esc_uuid_t* uuid = malloc(sizeof(esc_uuid_t)); @@ -26,6 +33,14 @@ const esc_uuid_t* esc_uuid_from(uint8_t* src, size_t size) { return uuid; } +const esc_uuid_t* esc_uuid_copy(const esc_uuid_t* other) { + return esc_uuid_from((uint8_t*)&other->data, UUID_SIZE); +} + +int esc_uuid_compare(const esc_uuid_t* left, const esc_uuid_t* right) { + return memcmp(left->data, right->data, UUID_SIZE); +} + const char* esc_uuid_format(const esc_uuid_t* uuid, char* buffer, size_t buf_size) { snprintf(buffer, buf_size, "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x", uuid->data[0], diff --git a/src/utils/uuid.h b/src/utils/uuid.h index b54c9e3..9c737ac 100644 --- a/src/utils/uuid.h +++ b/src/utils/uuid.h @@ -13,7 +13,10 @@ struct st_uuid { typedef struct st_uuid esc_uuid_t; const esc_uuid_t* esc_uuid_create(); +void esc_uuid_free(const esc_uuid_t* uuid); const esc_uuid_t* esc_uuid_from(uint8_t* src, size_t size); +const esc_uuid_t* esc_uuid_copy(const esc_uuid_t* other); +int esc_uuid_compare(const esc_uuid_t* left, const esc_uuid_t* right); const char* esc_uuid_format(const esc_uuid_t* uuid, char* buffer, size_t buf_size); #endif //ESC_UUID_H