adding tcp_messages, concurrent queue, types refactoring

This commit is contained in:
Nicolas Dextraze
2018-03-20 17:21:34 -07:00
parent 1f2958f0ad
commit 2088fd4082
20 changed files with 621 additions and 233 deletions

201
src/esc.c
View File

@@ -6,7 +6,6 @@
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
#include <unistd.h>
#include "esc.h"
#include "utils/socket.h"
#include "utils/buffer.h"
@@ -15,9 +14,14 @@
#include "utils/thread.h"
#include "utils/mutex.h"
#include "utils/string.h"
#include "utils/queue.h"
#include "tcp_messages.h"
#ifdef _WIN32
#define sleep Sleep
#define usleep Sleep
#endif
#ifdef __linux__
#include <unistd.h>
#endif
typedef int bool_t;
@@ -67,11 +71,10 @@ struct st_connection {
socket_t tcp_conn;
ProtobufCAllocator protobuf_c_allocator;
thread_t manager_thread;
mutex_t sending_lock;
mutex_t send_lock;
tcp_package_t* send_pkg;
mutex_t recv_lock;
tcp_package_t* recv_pkg;
bool_t stop;
mutex_t recv_peek_lock;
queue_t send_queue;
queue_t recv_queue;
};
struct st_static_endpoint_discoverer {
@@ -81,7 +84,7 @@ struct st_static_endpoint_discoverer {
typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t;
const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* discover_data, const tcp_endpoint_t* failed_tcp_endpoint) {
node_endpoints_t* result = malloc(sizeof(node_endpoints_t));
node_endpoints_t* result = malloc(sizeof(struct st_node_endpoints));
if (discover_data->use_ssl_connection) {
result->secure_tcp_endpoint = discover_data->tcp_endpoint;
} else {
@@ -90,64 +93,71 @@ const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* disc
return result;
}
int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) {
//TODO partial transfer
ssize_t connection_send_tcp_package(esc_connection_t conn, tcp_package_t pkg) {
char uuid_buf[37];
fprintf(stderr, "connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size);
fprintf(stderr, "connection_send_tcp_package: %u %u %s %lu ", pkg->command, pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data));
buffer_t send_buffer = tcp_package_to_buffer(pkg);
uint32_t size = (uint32_t)send_buffer.size;
if (socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t)) <= 0) {
buffer_free(send_buffer);
size_t send_buffer_size = buffer_size(send_buffer);
uint32_t size = (uint32_t)send_buffer_size;
ssize_t rc;
if ((rc = socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t))) <= 0) {
buffer_destroy(send_buffer);
fprintf(stderr, "%d\n", socket_error(conn->tcp_conn));
return -1;
}
if (socket_send(conn->tcp_conn, (char *) send_buffer.data, send_buffer.size) <= 0) {
buffer_free(send_buffer);
uint8_t* send_buffer_data = buffer_data(send_buffer);
if ((rc = socket_send(conn->tcp_conn, (char *) send_buffer_data, send_buffer_size)) <= 0) {
buffer_destroy(send_buffer);
fprintf(stderr, "%d\n", socket_error(conn->tcp_conn));
return -2;
}
buffer_free(send_buffer);
buffer_destroy(send_buffer);
fprintf(stderr, "0\n");
return 0;
}
const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) {
//TODO partial transfer
tcp_package_t connection_recv_tcp_package(esc_connection_t conn) {
uint32_t recv_size;
ssize_t rc;
if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) {
fprintf(stderr, "%ld %d", rc, socket_error());
fprintf(stderr, "connection_recv_tcp_package: %ld %d\n", rc, socket_error(conn->tcp_conn));
return 0;
}
buffer_t recv_buffer = buffer_create(recv_size);
size_t recv_buffer_size = recv_size;
uint8_t* recv_buffer_data = buffer_data(recv_buffer);
while(recv_size > 0) {
size_t pos = recv_buffer.size - recv_size;
rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer.data[pos], recv_size);
size_t pos = recv_buffer_size - recv_size;
rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer_data[pos], recv_size);
recv_size -= rc;
}
const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer);
buffer_free(recv_buffer);
tcp_package_t recv_pkg = tcp_package_from_buffer(recv_buffer);
buffer_destroy(recv_buffer);
char uuid_buf[37];
fprintf(stderr, "connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size);
//for (int32_t i=0;i<recv_pkg->data.size;i++) {
// printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]);
//}
//printf("\n");
fprintf(stderr, "connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), recv_buffer_size);
return recv_pkg;
}
void connection_enqueue_send(esc_connection_t* conn, tcp_package_t* pkg) {
mutex_lock(conn->sending_lock);
mutex_lock(conn->send_lock);
conn->send_pkg = pkg;
mutex_unlock(conn->send_lock);
inline void connection_enqueue_send(esc_connection_t conn, tcp_package_t pkg) {
queue_enqueue(conn->send_queue, pkg);
}
tcp_package_t* connection_wait_for(esc_connection_t* conn, esc_uuid_t* correlation_id) {
tcp_package_t* found = 0;
tcp_package_t connection_wait_for(esc_connection_t conn, esc_uuid_t correlation_id) {
tcp_package_t found = 0;
while(found == 0) {
mutex_lock(conn->recv_lock);
if (conn->recv_pkg && esc_uuid_compare(&conn->recv_pkg->correlation_id, correlation_id) == 0) {
found = conn->recv_pkg;
conn->recv_pkg = 0;
mutex_lock(conn->recv_peek_lock);
tcp_package_t peek = queue_peek(conn->recv_queue);
if (peek && esc_uuid_compare(peek->correlation_id, correlation_id) == 0) {
queue_dequeue(conn->recv_queue);
found = peek;
}
mutex_unlock(conn->recv_lock);
sleep(10);
mutex_unlock(conn->recv_peek_lock);
usleep(1);
}
return found;
}
@@ -160,7 +170,7 @@ void protobuf_c_free(void *alloc_data, void* p) {
free(p);
}
const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) {
esc_connection_t esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) {
if (strncmp(addr, "tcp://", 6) != 0) {
return 0;
}
@@ -177,85 +187,70 @@ const esc_connection_t* esc_connection_create(const esc_connection_settings_t* c
};
struct st_connection* conn = malloc(sizeof(struct st_connection));
conn->settings = *connection_settings;
conn->discoverer_data = malloc(sizeof(static_endpoint_discoverer_t));
memcpy(conn->discoverer_data, &discover_data, sizeof(static_endpoint_discoverer_t));
conn->discoverer_data = malloc(sizeof(struct st_static_endpoint_discoverer));
memcpy(conn->discoverer_data, &discover_data, sizeof(struct st_static_endpoint_discoverer));
conn->discover = (endpoint_discoverer_t)static_discover;
conn->protobuf_c_allocator.alloc = protobuf_c_alloc;
conn->protobuf_c_allocator.free = protobuf_c_free;
conn->protobuf_c_allocator.allocator_data = 0;
if (connection_name == 0 || strcmp(connection_name, "") == 0) {
const esc_uuid_t* uuid = esc_uuid_create();
esc_uuid_t uuid = esc_uuid_create();
char buf[40];
esc_uuid_format(uuid, buf, 40);
conn->name = string_copy(buf);
esc_uuid_free(uuid);
esc_uuid_destroy(uuid);
} else {
conn->name = connection_name;
}
conn->recv_pkg = 0;
conn->send_pkg = 0;
conn->send_lock = mutex_create();
conn->sending_lock = mutex_create();
conn->recv_lock = mutex_create();
conn->send_queue = queue_create();
conn->recv_peek_lock = mutex_create();
conn->recv_queue = queue_create();
conn->stop = 0;
return conn;
}
void* connection_thread(void* arg) {
esc_connection_t* conn = arg;
esc_connection_t conn = arg;
while(1) {
mutex_lock(conn->send_lock);
//printf("%p ", conn->send_pkg);
int rc;
if (conn->send_pkg) {
if ((rc = connection_send_tcp_package(conn, conn->send_pkg)) == 0) {
// free send pkg
conn->send_pkg = 0;
mutex_unlock(conn->sending_lock);
}
while(conn->stop == BOOL_FALSE) {
if (socket_writable(conn->tcp_conn)) {
tcp_package_t send_pkg = queue_dequeue(conn->send_queue);
if (send_pkg && connection_send_tcp_package(conn, send_pkg)) {
fprintf(stderr, "failed to send pkg.");
}
}
mutex_unlock(conn->send_lock);
if ((rc = socket_readable(conn->tcp_conn)) > 0) {
mutex_lock(conn->recv_lock);
if (conn->recv_pkg == 0) {
conn->recv_pkg = connection_recv_tcp_package(conn);
}
mutex_unlock(conn->recv_lock);
}
//printf("%d ", rc);
mutex_lock(conn->recv_lock);
if (conn->recv_pkg && conn->recv_pkg->command == 0x01) {
tcp_package_t* hb = tcp_package_create(0x02, &conn->recv_pkg->correlation_id, buffer_create(0));
conn->recv_pkg = 0;
connection_send_tcp_package(conn, hb);
}
mutex_unlock(conn->recv_lock);
//mutex_unlock(conn->mutex_lock);
sleep(10);
if (socket_readable(conn->tcp_conn)) {
tcp_package_t recv_pkg = connection_recv_tcp_package(conn);
if (recv_pkg && recv_pkg->command == MESSAGE_HEARTBEATREQUEST) {
tcp_package_t heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0));
queue_enqueue(conn->send_queue, heartbeat_pkg);
} else if (recv_pkg) {
queue_enqueue(conn->recv_queue, recv_pkg);
}
}
usleep(1);
}
}
// return 0 on success
// return non-zero on failure and sets last_error on connection
int esc_connection_connect(const esc_connection_t* conn) {
int esc_connection_connect(esc_connection_t conn) {
// Discover endpoint
const node_endpoints_t* endpoints = conn->discover(conn->discoverer_data, 0);
if (endpoints == 0) {
return -1;
}
// Establish Tcp Connection
esc_connection_t* _conn = (esc_connection_t*)conn;
_conn->tcp_conn = socket_create(SOCKET_TYPE_TCP);
conn->tcp_conn = socket_create(SOCKET_TYPE_TCP);
tcp_endpoint_t endpoint = conn->settings.use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint;
if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) {
return -2;
}
// Start manager thread
_conn->manager_thread = thread_create(connection_thread, _conn);
if (thread_start(_conn->manager_thread)) {
conn->manager_thread = thread_create(connection_thread, conn);
if (thread_start(conn->manager_thread)) {
return -3;
}
// Identify
@@ -268,10 +263,11 @@ int esc_connection_connect(const esc_connection_t* conn) {
uint8_t buffer[s];
event_store__client__messages__identify_client__pack(&identify_client, buffer);
// build tcp_package
const tcp_package_t* send_pkg = tcp_package_create(0xF5, esc_uuid_create(), buffer_from(buffer, s));
connection_enqueue_send(_conn, send_pkg);
const tcp_package_t* recv_pkg = connection_wait_for(_conn, &send_pkg->correlation_id);
if (recv_pkg->command != 0xF6) {
tcp_package_t send_pkg = tcp_package_create(MESSAGE_IDENTIFYCLIENT, esc_uuid_create(), buffer_from(buffer, s));
queue_enqueue(conn->send_queue, send_pkg);
tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id);
if (recv_pkg->command != MESSAGE_CLIENTIDENTIFIED) {
return -5;
}
@@ -287,7 +283,7 @@ const esc_credentials_t* esc_credentials_create(const char* username, const char
const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) {
if (msg == 0) return 0;
esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t));
esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event));
ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len);
ev->event_type = string_copy(msg->event_type);
ev->event_number = msg->event_number;
@@ -300,7 +296,7 @@ const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__
}
esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__ResolvedEvent* msg) {
esc_resolved_event_t* ev = malloc(sizeof(esc_resolved_event_t));
esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event));
ev->original_position.prepare_position = msg->prepare_position;
ev->original_position.commit_position = msg->commit_position;
ev->event = recorded_event_create(msg->event);
@@ -308,7 +304,7 @@ esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__Resolv
return ev;
}
const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials) {
const esc_all_events_slice_t* esc_connection_read_all_forward(esc_connection_t conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials) {
EventStore__Client__Messages__ReadAllEvents send_msg;
event_store__client__messages__read_all_events__init(&send_msg);
send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0;
@@ -319,20 +315,21 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti
size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg);
uint8_t buffer[s];
event_store__client__messages__read_all_events__pack(&send_msg, buffer);
const tcp_package_t* send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password);
connection_enqueue_send(conn, send_pkg);
tcp_package_t send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password);
queue_enqueue(conn->send_queue, send_pkg);
const tcp_package_t* recv_pkg = connection_wait_for(conn, &send_pkg->correlation_id);
tcp_package_t recv_pkg = connection_wait_for(conn, send_pkg->correlation_id);
if (recv_pkg->command != 0xB7) {
return 0;
}
esc_connection_t* _conn = (esc_connection_t*)conn;
size_t data_size = buffer_size(recv_pkg->data);
uint8_t* data = buffer_data(recv_pkg->data);
EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg =
event_store__client__messages__read_all_events_completed__unpack(&_conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data);
event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, data_size, data);
esc_all_events_slice_t* result = malloc(sizeof(esc_all_events_slice_t));
esc_all_events_slice_t* result = malloc(sizeof(struct st_all_events_slice));
result->read_direction = "forward";
result->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0;
result->from_position.prepare_position = recv_msg->prepare_position;
@@ -340,16 +337,16 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti
result->next_position.prepare_position = recv_msg->next_prepare_position;
result->next_position.commit_position = recv_msg->next_commit_position;
result->n_events = recv_msg->n_events;
result->events = malloc(recv_msg->n_events * sizeof(esc_resolved_event_t*));
result->events = malloc(recv_msg->n_events * sizeof(void*));
for (size_t i = 0; i < recv_msg->n_events; i++) {
result->events[i] = resolved_event_create(recv_msg->events[i]);
}
event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &_conn->protobuf_c_allocator);
event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator);
return result;
}
void esc_connection_close(const esc_connection_t* conn) {
void esc_connection_close(esc_connection_t conn) {
socket_close(conn->tcp_conn);
}