diff --git a/src/esc.c b/src/esc.c index 3253236..4c7a5a8 100644 --- a/src/esc.c +++ b/src/esc.c @@ -8,7 +8,6 @@ #include #include "esc.h" #include "utils/socket.h" -#include "utils/buffer.h" #include "proto.h" #include "tcp_package.h" #include "utils/thread.h" @@ -18,6 +17,7 @@ #include "tcp_messages.h" #ifdef _WIN32 +#include #define usleep Sleep #endif #ifdef __linux__ @@ -96,7 +96,11 @@ const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* disc //TODO partial transfer ssize_t connection_send_tcp_package(esc_connection_t conn, tcp_package_t pkg) { char uuid_buf[37]; - fprintf(stderr, "connection_send_tcp_package: %u %u %s %lu ", pkg->command, pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); +#ifdef _WIN32 + fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); +#else + fprintf(stderr, "connection_send_tcp_package: %s %u %s %lu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); +#endif buffer_t send_buffer = tcp_package_to_buffer(pkg); size_t send_buffer_size = buffer_size(send_buffer); @@ -125,7 +129,11 @@ tcp_package_t connection_recv_tcp_package(esc_connection_t conn) { uint32_t recv_size; ssize_t rc; if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { +#ifdef _WIN32 + fprintf(stderr, "connection_recv_tcp_package: %lld %d\n", rc, socket_error(conn->tcp_conn)); +#else fprintf(stderr, "connection_recv_tcp_package: %ld %d\n", rc, socket_error(conn->tcp_conn)); +#endif return 0; } buffer_t recv_buffer = buffer_create(recv_size); @@ -139,7 +147,11 @@ tcp_package_t connection_recv_tcp_package(esc_connection_t conn) { tcp_package_t recv_pkg = tcp_package_from_buffer(recv_buffer); buffer_destroy(recv_buffer); char uuid_buf[37]; - fprintf(stderr, "connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), recv_buffer_size); +#ifdef _WIN32 + fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); +#else + fprintf(stderr, "connection_recv_tcp_package: %s %u %s %lu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); +#endif return recv_pkg; } @@ -274,8 +286,8 @@ int esc_connection_connect(esc_connection_t conn) { return 0; } -const esc_credentials_t* esc_credentials_create(const char* username, const char* password) { - esc_credentials_t* creds = malloc(sizeof(struct st_credentials)); +esc_credentials_t esc_credentials_create(const char* username, const char* password) { + esc_credentials_t creds = malloc(sizeof(struct st_credentials)); creds->username = username; creds->password = password; return creds; @@ -304,7 +316,7 @@ esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__Resolv return ev; } -const esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials) { +esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t credentials) { EventStore__Client__Messages__ReadAllEvents send_msg; event_store__client__messages__read_all_events__init(&send_msg); send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0; @@ -329,7 +341,7 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t c EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, data_size, data); - esc_all_events_slice_t* result = malloc(sizeof(struct st_all_events_slice)); + esc_all_events_slice_t result = malloc(sizeof(struct st_all_events_slice)); result->read_direction = "forward"; result->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0; result->from_position.prepare_position = recv_msg->prepare_position; @@ -351,6 +363,10 @@ void esc_connection_close(esc_connection_t conn) { } const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { +#ifdef _WIN32 + snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); +#else snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position); +#endif return buffer; } diff --git a/src/esc.h b/src/esc.h index 83d80ce..5e17cab 100644 --- a/src/esc.h +++ b/src/esc.h @@ -24,7 +24,7 @@ struct st_credentials { const char* username; const char* password; }; -typedef struct st_credentials esc_credentials_t; +typedef struct st_credentials* esc_credentials_t; struct st_esc_position { int64_t prepare_position; @@ -58,15 +58,15 @@ struct st_all_events_slice { esc_resolved_event_t** events; int is_end_of_stream; }; -typedef struct st_all_events_slice esc_all_events_slice_t; +typedef struct st_all_events_slice* esc_all_events_slice_t; const esc_connection_settings_t* esc_default_connection_settings; // Connection esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name); int esc_connection_connect(esc_connection_t conn); -const esc_credentials_t* esc_credentials_create(const char* username, const char* password); -const esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials); +esc_credentials_t esc_credentials_create(const char* username, const char* password); +esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t credentials); void esc_connection_close(esc_connection_t conn); // Utils diff --git a/src/main.c b/src/main.c index 0163daa..30c95b1 100644 --- a/src/main.c +++ b/src/main.c @@ -1,16 +1,14 @@ #include -#ifdef __linux__ -#include -#include - -#endif -#include "utils/mutex.h" #include "esc.h" -#include "utils/queue.h" #ifdef _WIN32 +#include +#include #define usleep Sleep #endif +#ifdef __linux__ +#include +#endif int main() { #ifdef _WIN32 @@ -25,9 +23,9 @@ int main() { if (esc_connection_connect(conn) != 0) { return -2; } - const esc_credentials_t* credentials = esc_credentials_create("admin", "changeit"); + esc_credentials_t credentials = esc_credentials_create("admin", "changeit"); - const esc_all_events_slice_t *result = 0; + esc_all_events_slice_t result = 0; do { result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 100, credentials); if (result == 0) { @@ -41,7 +39,11 @@ int main() { result->is_end_of_stream); char uuid_buf[37]; for (size_t i = 0; i < result->n_events; i++) { +#ifdef _WIN32 + printf("%s %s %lld@%s %llu %llu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), +#else printf("%s %s %ld@%s %lu %lu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), +#endif result->events[i]->event->event_type, result->events[i]->event->event_number, result->events[i]->event->event_stream_id, diff --git a/src/utils/socket.c b/src/utils/socket.c index 9fc2af1..ecac208 100644 --- a/src/utils/socket.c +++ b/src/utils/socket.c @@ -13,6 +13,7 @@ struct st_socket { int af; int type; int proto; + int last_error; }; socket_t socket_create(int type) { @@ -23,16 +24,19 @@ socket_t socket_create(int type) { s->proto = IPPROTO_TCP; s->s = socket(s->af, s->type, s->proto); if (s->s == INVALID_SOCKET) { - free(s); - return NULL; + s->last_error = WSAGetLastError(); } return s; } return NULL; } -void socket_close(socket_t s) { - closesocket(s->s); +int socket_close(socket_t s) { + int rc = closesocket(s->s); + if (rc != 0) { + s->last_error = WSAGetLastError(); + } + return rc; } int socket_connect(socket_t s, char* addr, unsigned short port) { @@ -43,30 +47,62 @@ int socket_connect(socket_t s, char* addr, unsigned short port) { hints.ai_socktype = s->type; hints.ai_protocol = s->proto; ADDRINFO* result; - if (getaddrinfo(addr, itoa(port, port_s, 10), &hints, &result) != 0) { + int rc; + if ((rc = getaddrinfo(addr, itoa(port, port_s, 10), &hints, &result)) != 0) { + s->last_error = WSAGetLastError(); return -1; } - return connect(s->s, result->ai_addr, (int)result->ai_addrlen); + if ((rc = connect(s->s, result->ai_addr, (int)result->ai_addrlen)) != 0) { + s->last_error = WSAGetLastError(); + return -2; + } + return 0; } ssize_t socket_send(socket_t s, char* data, size_t len) { - return send(s->s, data, (int)len, 0); + ssize_t rc = send(s->s, data, (int)len, 0); + if (rc < 0) { + s->last_error = WSAGetLastError(); + } + return rc; } ssize_t socket_recv(socket_t s, char* buf, size_t len) { - return recv(s->s, buf, (int)len, 0); + ssize_t rc = recv(s->s, buf, (int)len, 0); + if (rc < 0) { + s->last_error = WSAGetLastError(); + } + return rc; } int socket_readable(socket_t s) { fd_set readable; + FD_ZERO(&readable); FD_SET(s->s, &readable); struct timeval timeout = {0, 0}; - return select(0, &readable, 0, 0, &timeout); + int rc = select(0, &readable, 0, 0, &timeout); + if (rc < 0) { + s->last_error = WSAGetLastError(); + } + return rc; } -int socket_error() { - return WSAGetLastError(); +int socket_writable(socket_t s) { + fd_set writable; + FD_ZERO(&writable); + FD_SET(s->s, &writable); + struct timeval timeout = {0, 0}; + + int rc = select(0, 0, &writable, 0, &timeout); + if (rc < 0) { + s->last_error = WSAGetLastError(); + } + return rc; +} + +int socket_error(socket_t s) { + return s->last_error; } #endif #ifdef __linux__ diff --git a/src/utils/uuid.c b/src/utils/uuid.c index 8aad106..8d5842f 100644 --- a/src/utils/uuid.c +++ b/src/utils/uuid.c @@ -10,14 +10,19 @@ #define UUID_SIZE 16 +int srand_called = 0; + struct st_uuid { char data[UUID_SIZE]; }; //TODO: fixme - this is not secure, use crypto random bytes instead of rand esc_uuid_t esc_uuid_create() { + if (srand_called == 0) { + srand(time(NULL)); + srand_called = 1; + } esc_uuid_t result = malloc(sizeof(struct st_uuid)); - srand(time(NULL)); for(int i=0;idata[i] = (uint8_t)(rand() & 0xff); return result; @@ -45,6 +50,16 @@ int esc_uuid_compare(esc_uuid_t left, esc_uuid_t right) { } const char* esc_uuid_format(esc_uuid_t uuid, char* buffer, size_t buf_size) { +#ifdef _WIN32 + snprintf(buffer, buf_size, "%08lx-%04hx-%04hx-%04hx-%08lx%04hx", + *(long*)&uuid->data[0], + *(short*)&uuid->data[4], + *(short*)&uuid->data[6], + *(short*)&uuid->data[8], + *(long*)&uuid->data[10], + *(short*)&uuid->data[14]); +#endif +#ifdef __linux__ snprintf(buffer, buf_size, "%02hhx%02hhx%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx", uuid->data[0], uuid->data[1], @@ -62,5 +77,6 @@ const char* esc_uuid_format(esc_uuid_t uuid, char* buffer, size_t buf_size) { uuid->data[13], uuid->data[14], uuid->data[15]); +#endif return buffer; }