// // Created by nicol on 2018-03-18. // #include #include #include #include #include "esc.h" #include "socket.h" #include "buffer.h" #include "ClientMessageDtos.pb-c.h" #include "tcp_package.h" struct st_connection_settings { unsigned char use_ssl_connection; }; const esc_connection_settings_t default_connection_settings = { }; const esc_connection_settings_t* esc_default_connection_settings = &default_connection_settings; struct st_tcp_endpoint { char* host; unsigned short port; }; typedef struct st_tcp_endpoint tcp_endpoint_t; struct st_node_endpoints { tcp_endpoint_t tcp_endpoint; tcp_endpoint_t secure_tcp_endpoint; }; typedef struct st_node_endpoints node_endpoints_t; typedef const node_endpoints_t* (*endpoint_discoverer_t)(const void* discover_data, const tcp_endpoint_t* failed_tcp_endpoint); struct st_connection { esc_connection_settings_t settings; void* discoverer_data; endpoint_discoverer_t discover; socket_t tcp_conn; ProtobufCAllocator protobuf_c_allocator; }; struct st_static_endpoint_discoverer { tcp_endpoint_t tcp_endpoint; unsigned char use_ssl_connection; }; typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t; const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* discover_data, const tcp_endpoint_t* failed_tcp_endpoint) { node_endpoints_t* result = malloc(sizeof(node_endpoints_t)); if (discover_data->use_ssl_connection) { result->secure_tcp_endpoint = discover_data->tcp_endpoint; } else { result->tcp_endpoint = discover_data->tcp_endpoint; } return result; } int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) { char uuid_buf[37]; printf("connection_send_tcp_package: %u %u %s %u\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size); buffer_t send_buffer = tcp_package_to_buffer(pkg); if (socket_send(conn->tcp_conn, (char *) &send_buffer.size, sizeof(uint32_t)) <= 0) { buffer_free(send_buffer); return -1; } if (socket_send(conn->tcp_conn, (char *) send_buffer.data, send_buffer.size) <= 0) { buffer_free(send_buffer); return -2; } buffer_free(send_buffer); return 0; } const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) { uint32_t recv_size; int rc; if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { printf("%d %d", rc, socket_error()); return 0; } buffer_t recv_buffer = buffer_create(recv_size); while(recv_size > 0) { int pos = recv_buffer.size - recv_size; rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer.data[pos], recv_size); recv_size -= rc; } const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer); buffer_free(recv_buffer); char uuid_buf[37]; printf("connection_recv_tcp_package: %u %u %s %u\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size); //for (int32_t i=0;idata.size;i++) { // printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]); //} //printf("\n"); return recv_pkg; } void* protobuf_c_alloc(void *alloc_data, size_t size) { return malloc(size); } void protobuf_c_free(void *alloc_data, void* p) { free(p); } const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { if (strncmp(addr, "tcp://", 6) != 0) { return 0; } char* pos = strrchr(addr, ':'); if (pos == 0) { return 0; } char* host = malloc(pos - addr - 5); strncpy(host, addr+6, pos-addr-6); host[pos-addr-6] = 0; unsigned short port = (unsigned short)atoi(pos+1); static_endpoint_discoverer_t discover_data = { {host, port}, connection_settings->use_ssl_connection }; struct st_connection* conn = malloc(sizeof(struct st_connection)); conn->settings = *connection_settings; conn->discoverer_data = malloc(sizeof(static_endpoint_discoverer_t)); memcpy(conn->discoverer_data, &discover_data, sizeof(static_endpoint_discoverer_t)); conn->discover = (endpoint_discoverer_t)static_discover; conn->protobuf_c_allocator.alloc = protobuf_c_alloc; conn->protobuf_c_allocator.free = protobuf_c_free; conn->protobuf_c_allocator.allocator_data = 0; return conn; } // return 0 on success // return non-zero on failure and sets last_error on connection int esc_connection_connect(const esc_connection_t* conn) { // Discover endpoint const node_endpoints_t* endpoints = conn->discover(conn->discoverer_data, 0); if (endpoints == 0) { return -1; } // Establish Tcp Connection esc_connection_t* _conn = (esc_connection_t*)conn; _conn->tcp_conn = socket_create(SOCKET_TYPE_TCP); tcp_endpoint_t endpoint = conn->settings.use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint; if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) { return -2; } // Identify // build message EventStore__Client__Messages__IdentifyClient identify_client; event_store__client__messages__identify_client__init(&identify_client); identify_client.connection_name = "abc123"; identify_client.version = 1; size_t s = event_store__client__messages__identify_client__get_packed_size(&identify_client); uint8_t buffer[s]; event_store__client__messages__identify_client__pack(&identify_client, buffer); // build tcp_package const tcp_package_t* send_pkg = tcp_package_create(0xF5, esc_uuid_create(), buffer_from(buffer, s)); if (connection_send_tcp_package(conn, send_pkg) != 0) { return -3; } const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn); if (recv_pkg == 0) { return -4; } if (recv_pkg->command != 0xF6) { return -5; } return 0; } const esc_credentials_t* esc_credentials_create(const char* username, const char* password) { esc_credentials_t* creds = malloc(sizeof(struct st_credentials)); creds->username = username; creds->password = password; return creds; } const char* string_clone(const char* src) { char* dst = malloc(strlen(src)+1); strcpy(dst, src); return dst; } const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) { if (msg == 0) return 0; esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t)); ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len); ev->event_type = string_clone(msg->event_type); return ev; } esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__ResolvedEvent* msg) { esc_resolved_event_t* ev = malloc(sizeof(esc_resolved_event_t)); ev->original_position.prepare_position = msg->prepare_position; ev->original_position.commit_position = msg->commit_position; ev->event = recorded_event_create(msg->event); ev->link = recorded_event_create(msg->link); return ev; } const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials) { EventStore__Client__Messages__ReadAllEvents send_msg; event_store__client__messages__read_all_events__init(&send_msg); send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0; send_msg.commit_position = last_checkpoint ? last_checkpoint->commit_position : 0; send_msg.max_count = count; send_msg.require_master = 0; send_msg.resolve_link_tos = 0; size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg); uint8_t buffer[s]; event_store__client__messages__read_all_events__pack(&send_msg, buffer); const tcp_package_t* send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password); if (connection_send_tcp_package(conn, send_pkg) != 0) { return 0; } const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn); if (recv_pkg == 0) { return 0; } if (recv_pkg->command != 0xB7) { return 0; } EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data); esc_all_events_slice_t* result = malloc(sizeof(esc_all_events_slice_t)); result->read_direction = "forward"; result->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0; result->from_position.prepare_position = recv_msg->prepare_position; result->from_position.commit_position = recv_msg->commit_position; result->next_position.prepare_position = recv_msg->next_prepare_position; result->next_position.commit_position = recv_msg->next_commit_position; result->n_events = recv_msg->n_events; result->events = malloc(recv_msg->n_events * sizeof(esc_resolved_event_t*)); for (size_t i = 0; i < recv_msg->n_events; i++) { result->events[i] = resolved_event_create(recv_msg->events[i]); } event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); return result; } const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); return buffer; }