// // Created by nicol on 2018-03-18. // #include #include #include #include #include "esc.h" #include "utils/socket.h" #include "proto.h" #include "tcp_package.h" #include "utils/thread.h" #include "utils/mutex.h" #include "utils/string.h" #include "utils/queue.h" #include "tcp_messages.h" #ifdef _WIN32 #include #define usleep Sleep #endif #ifdef __linux__ #include #endif typedef int bool_t; const bool_t BOOL_TRUE = 1; const bool_t BOOL_FALSE = 0; const esc_error_t _error_create(const char* file, int line, int code, char* format, ...) { va_list vl; va_start(vl, format); esc_error_t err; err.file = file; err.line = line; err.code = code; vsnprintf(err.message, ERROR_MSG_SIZE, format, vl); return err; } #define error_create(code, fmt, args...) _error_create(__FILE__, __LINE__, code, fmt, args) struct st_connection_settings { bool_t use_ssl_connection; }; const esc_connection_settings_t default_connection_settings = { }; const esc_connection_settings_t* esc_default_connection_settings = &default_connection_settings; struct st_tcp_endpoint { char* host; unsigned short port; }; typedef struct st_tcp_endpoint tcp_endpoint_t; struct st_node_endpoints { tcp_endpoint_t tcp_endpoint; tcp_endpoint_t secure_tcp_endpoint; }; typedef struct st_node_endpoints node_endpoints_t; typedef const node_endpoints_t* (*endpoint_discoverer_t)(const void* discover_data, const tcp_endpoint_t* failed_tcp_endpoint); struct st_connection { esc_connection_settings_t settings; const char* name; void* discoverer_data; endpoint_discoverer_t discover; socket_t tcp_conn; ProtobufCAllocator protobuf_c_allocator; thread_t manager_thread; bool_t stop; mutex_t recv_peek_lock; queue_t send_queue; queue_t recv_queue; bool_t has_error; esc_error_t last_error; }; struct st_static_endpoint_discoverer { tcp_endpoint_t tcp_endpoint; bool_t use_ssl_connection; }; typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t; const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* discover_data, const tcp_endpoint_t* failed_tcp_endpoint) { node_endpoints_t* result = malloc(sizeof(struct st_node_endpoints)); if (discover_data->use_ssl_connection) { result->secure_tcp_endpoint = discover_data->tcp_endpoint; } else { result->tcp_endpoint = discover_data->tcp_endpoint; } return result; } //TODO partial transfer ssize_t connection_send_tcp_package(esc_connection_t conn, tcp_package_t pkg) { char uuid_buf[37]; #ifdef _WIN32 fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); #else fprintf(stderr, "connection_send_tcp_package: %s %u %s %lu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); #endif buffer_t send_buffer = tcp_package_to_buffer(pkg); size_t send_buffer_size = buffer_size(send_buffer); uint32_t size = (uint32_t)send_buffer_size; ssize_t rc; if ((rc = socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t))) <= 0) { buffer_destroy(send_buffer); fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); return -1; } uint8_t* send_buffer_data = buffer_data(send_buffer); if ((rc = socket_send(conn->tcp_conn, (char *) send_buffer_data, send_buffer_size)) <= 0) { buffer_destroy(send_buffer); fprintf(stderr, "%d\n", socket_error(conn->tcp_conn)); return -2; } buffer_destroy(send_buffer); fprintf(stderr, "0\n"); return 0; } //TODO partial transfer tcp_package_t connection_recv_tcp_package(esc_connection_t conn) { uint32_t recv_size; ssize_t rc; if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { #ifdef _WIN32 fprintf(stderr, "connection_recv_tcp_package: %lld %d\n", rc, socket_error(conn->tcp_conn)); #else fprintf(stderr, "connection_recv_tcp_package: %ld %d\n", rc, socket_error(conn->tcp_conn)); #endif return 0; } buffer_t recv_buffer = buffer_create(recv_size); size_t recv_buffer_size = recv_size; uint8_t* recv_buffer_data = buffer_data(recv_buffer); while(recv_size > 0) { size_t pos = recv_buffer_size - recv_size; rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer_data[pos], recv_size); recv_size -= rc; } tcp_package_t recv_pkg = tcp_package_from_buffer(recv_buffer); buffer_destroy(recv_buffer); char uuid_buf[37]; #ifdef _WIN32 fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); #else fprintf(stderr, "connection_recv_tcp_package: %s %u %s %lu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); #endif return recv_pkg; } inline void connection_enqueue_send(esc_connection_t conn, tcp_package_t pkg) { queue_enqueue(conn->send_queue, pkg); } tcp_package_t connection_wait_for(esc_connection_t conn, esc_uuid_t correlation_id) { tcp_package_t found = 0; while(found == 0) { mutex_lock(conn->recv_peek_lock); tcp_package_t peek = queue_peek(conn->recv_queue); if (peek && esc_uuid_compare(peek->correlation_id, correlation_id) == 0) { queue_dequeue(conn->recv_queue); found = peek; } mutex_unlock(conn->recv_peek_lock); usleep(1); } return found; } void* protobuf_c_alloc(void *alloc_data, size_t size) { return malloc(size); } void protobuf_c_free(void *alloc_data, void* p) { free(p); } esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { struct st_connection* conn = malloc(sizeof(struct st_connection)); conn->settings = *connection_settings; conn->protobuf_c_allocator.alloc = protobuf_c_alloc; conn->protobuf_c_allocator.free = protobuf_c_free; conn->protobuf_c_allocator.allocator_data = 0; if (connection_name == 0 || strcmp(connection_name, "") == 0) { esc_uuid_t uuid = esc_uuid_create(); char buf[40]; esc_uuid_format(uuid, buf, 40); conn->name = string_copy(buf); esc_uuid_destroy(uuid); } else { conn->name = connection_name; } conn->send_queue = queue_create(); conn->recv_peek_lock = mutex_create(); conn->recv_queue = queue_create(); conn->stop = 0; conn->has_error = 0; memset(&conn->last_error, 0, sizeof(struct st_error)); if (strncmp(addr, "tcp://", 6) != 0) { conn->has_error = 1; conn->last_error = error_create(1, "invalid schema for address: %s", addr); return conn; } char* pos = strrchr(addr, ':'); if (pos == 0) { conn->has_error = 1; conn->last_error = error_create(2, "missing port in address: %s", addr); return conn; } char* host = malloc(pos - addr - 5); strncpy(host, addr+6, pos-addr-6); host[pos-addr-6] = 0; unsigned short port = (unsigned short)atoi(pos+1); static_endpoint_discoverer_t discover_data = { {host, port}, connection_settings->use_ssl_connection }; conn->discoverer_data = malloc(sizeof(struct st_static_endpoint_discoverer)); memcpy(conn->discoverer_data, &discover_data, sizeof(struct st_static_endpoint_discoverer)); conn->discover = (endpoint_discoverer_t)static_discover; return conn; } void* connection_thread(void* arg) { esc_connection_t conn = arg; while(conn->stop == BOOL_FALSE) { if (socket_writable(conn->tcp_conn)) { printf("T1"); tcp_package_t send_pkg = queue_dequeue(conn->send_queue); printf("T1(%p)", send_pkg); if (send_pkg && connection_send_tcp_package(conn, send_pkg)) { fprintf(stderr, "failed to send pkg."); } printf("T2"); } if (socket_readable(conn->tcp_conn)) { printf("T3"); tcp_package_t recv_pkg = connection_recv_tcp_package(conn); printf("T4"); if (recv_pkg && recv_pkg->command == MESSAGE_HEARTBEATREQUEST) { tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); printf("T5"); queue_enqueue(conn->send_queue, heartbeat_pkg); } else if (recv_pkg) { printf("T6"); queue_enqueue(conn->recv_queue, recv_pkg); } printf("T7"); } usleep(1); } } // return 0 on success // return non-zero on failure and sets last_error on connection int esc_connection_connect(esc_connection_t conn) { // Discover endpoint const node_endpoints_t* endpoints = conn->discover(conn->discoverer_data, 0); if (endpoints == 0) { return -1; } // Establish Tcp Connection conn->tcp_conn = socket_create(SOCKET_TYPE_TCP); tcp_endpoint_t endpoint = conn->settings.use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint; if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) { conn->last_error = error_create(socket_error(conn->tcp_conn), "can't connect to %s:%d", endpoint.host, endpoint.port); return -1; } // Start manager thread conn->manager_thread = thread_create(connection_thread, conn); if (thread_start(conn->manager_thread)) { conn->last_error = error_create(thread_error(conn->manager_thread), "can't start manager thread.", ""); return -1; } // Identify // build message EventStore__Client__Messages__IdentifyClient identify_client; event_store__client__messages__identify_client__init(&identify_client); identify_client.connection_name = (char*)conn->name; identify_client.version = 1; size_t s = event_store__client__messages__identify_client__get_packed_size(&identify_client); uint8_t buffer[s]; event_store__client__messages__identify_client__pack(&identify_client, buffer); // build tcp_package tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, esc_uuid_create(), buffer_from(buffer, s)); queue_enqueue(conn->send_queue, send_pkg); tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) { conn->last_error = error_create(0, "server error: %d.", recv_pkg->command); return -1; } return 0; } esc_credentials_t esc_credentials_create(const char* username, const char* password) { esc_credentials_t creds = malloc(sizeof(struct st_credentials)); creds->username = string_copy(username); creds->password = string_copy(password); return creds; } const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) { if (msg == 0) return 0; esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event)); ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len); ev->event_type = string_copy(msg->event_type); ev->event_number = msg->event_number; ev->event_stream_id = string_copy(msg->event_stream_id); ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0; ev->data = buffer_from(msg->data.data, msg->data.len); ev->metadata = buffer_from(msg->metadata.data, msg->metadata.len); return ev; } esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__ResolvedEvent* msg) { esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event)); ev->original_position.prepare_position = msg->prepare_position; ev->original_position.commit_position = msg->commit_position; ev->event = recorded_event_create(msg->event); ev->link = recorded_event_create(msg->link); return ev; } esc_all_events_slice_t esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, esc_credentials_t credentials) { //TODO function for packing from client struct direct to protobuf data EventStore__Client__Messages__ReadAllEvents send_msg; event_store__client__messages__read_all_events__init(&send_msg); send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0; send_msg.commit_position = last_checkpoint ? last_checkpoint->commit_position : 0; send_msg.max_count = count; send_msg.require_master = 0; send_msg.resolve_link_tos = 0; size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); uint8_t buffer[s]; event_store__client__messages__read_all_events__pack(&send_msg, buffer); printf("A %s\n", credentials->password); tcp_package_t send_pkg = tcp_package_create_authenticated(MESSAGE_READALLEVENTSFORWARD, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); printf("A1 %s %s\n", credentials->password, send_pkg->password); queue_enqueue(conn->send_queue, send_pkg); printf("A2 %s %s\n", credentials->password, send_pkg->password); printf("B %s\n", credentials->password); tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id); if (recv_pkg->command != MESSAGE_READALLEVENTSFORWARDCOMPLETED) { conn->last_error = error_create(recv_pkg->command, "server error: %s", get_string_for_tcp_message(recv_pkg->command)); return 0; } printf("C %s\n", credentials->password); //TODO function for unpacking from protobuf data to client struct size_t data_size = buffer_size(recv_pkg->data); uint8_t* data = buffer_data(recv_pkg->data); EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, data_size, data); esc_all_events_slice_t result = malloc(sizeof(struct st_all_events_slice)); result->read_direction = "forward"; result->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0; result->from_position.prepare_position = recv_msg->prepare_position; result->from_position.commit_position = recv_msg->commit_position; result->next_position.prepare_position = recv_msg->next_prepare_position; result->next_position.commit_position = recv_msg->next_commit_position; result->n_events = recv_msg->n_events; result->events = malloc(recv_msg->n_events * sizeof(void*)); for (size_t i = 0; i < recv_msg->n_events; i++) { result->events[i] = resolved_event_create(recv_msg->events[i]); } event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); printf("D %s\n", credentials->password); return result; } void esc_connection_close(esc_connection_t conn) { socket_close(conn->tcp_conn); } inline esc_error_t esc_connection_last_error(esc_connection_t conn) { return conn->last_error; } const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { #ifdef _WIN32 snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); #else snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position); #endif return buffer; }