async send/recv (wip)

This commit is contained in:
Nicolas Dextraze
2018-03-19 22:48:55 -07:00
parent 9498cc74e0
commit 1f2958f0ad
13 changed files with 190 additions and 77 deletions

124
src/esc.c
View File

@@ -14,12 +14,11 @@
#include "tcp_package.h"
#include "utils/thread.h"
#include "utils/mutex.h"
#include "utils/string.h"
const char* string_copy(const char *src) {
char* dst = malloc(strlen(src)+1);
strcpy(dst, src);
return dst;
}
#ifdef _WIN32
#define sleep Sleep
#endif
typedef int bool_t;
const bool_t BOOL_TRUE = 1;
@@ -68,7 +67,11 @@ struct st_connection {
socket_t tcp_conn;
ProtobufCAllocator protobuf_c_allocator;
thread_t manager_thread;
mutex_t mutex_lock;
mutex_t sending_lock;
mutex_t send_lock;
tcp_package_t* send_pkg;
mutex_t recv_lock;
tcp_package_t* recv_pkg;
};
struct st_static_endpoint_discoverer {
@@ -89,7 +92,7 @@ const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* disc
int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) {
char uuid_buf[37];
printf("connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size);
fprintf(stderr, "connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size);
buffer_t send_buffer = tcp_package_to_buffer(pkg);
uint32_t size = (uint32_t)send_buffer.size;
if (socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t)) <= 0) {
@@ -108,7 +111,7 @@ const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) {
uint32_t recv_size;
ssize_t rc;
if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) {
printf("%ld %d", rc, socket_error());
fprintf(stderr, "%ld %d", rc, socket_error());
return 0;
}
buffer_t recv_buffer = buffer_create(recv_size);
@@ -120,7 +123,7 @@ const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) {
const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer);
buffer_free(recv_buffer);
char uuid_buf[37];
printf("connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size);
fprintf(stderr, "connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size);
//for (int32_t i=0;i<recv_pkg->data.size;i++) {
// printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]);
//}
@@ -128,6 +131,27 @@ const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) {
return recv_pkg;
}
void connection_enqueue_send(esc_connection_t* conn, tcp_package_t* pkg) {
mutex_lock(conn->sending_lock);
mutex_lock(conn->send_lock);
conn->send_pkg = pkg;
mutex_unlock(conn->send_lock);
}
tcp_package_t* connection_wait_for(esc_connection_t* conn, esc_uuid_t* correlation_id) {
tcp_package_t* found = 0;
while(found == 0) {
mutex_lock(conn->recv_lock);
if (conn->recv_pkg && esc_uuid_compare(&conn->recv_pkg->correlation_id, correlation_id) == 0) {
found = conn->recv_pkg;
conn->recv_pkg = 0;
}
mutex_unlock(conn->recv_lock);
sleep(10);
}
return found;
}
void* protobuf_c_alloc(void *alloc_data, size_t size) {
return malloc(size);
}
@@ -164,24 +188,53 @@ const esc_connection_t* esc_connection_create(const esc_connection_settings_t* c
char buf[40];
esc_uuid_format(uuid, buf, 40);
conn->name = string_copy(buf);
free(uuid);
esc_uuid_free(uuid);
} else {
conn->name = connection_name;
}
conn->recv_pkg = 0;
conn->send_pkg = 0;
conn->send_lock = mutex_create();
conn->sending_lock = mutex_create();
conn->recv_lock = mutex_create();
return conn;
}
void* connection_thread(void* arg) {
const esc_connection_t* conn = arg;
esc_connection_t* conn = arg;
while(1) {
//mutex_lock(conn->mutex_lock);
//if (socket_readable(conn->tcp_conn)) {
//}
mutex_lock(conn->send_lock);
//printf("%p ", conn->send_pkg);
int rc;
if (conn->send_pkg) {
if ((rc = connection_send_tcp_package(conn, conn->send_pkg)) == 0) {
// free send pkg
conn->send_pkg = 0;
mutex_unlock(conn->sending_lock);
}
}
mutex_unlock(conn->send_lock);
if ((rc = socket_readable(conn->tcp_conn)) > 0) {
mutex_lock(conn->recv_lock);
if (conn->recv_pkg == 0) {
conn->recv_pkg = connection_recv_tcp_package(conn);
}
mutex_unlock(conn->recv_lock);
}
//printf("%d ", rc);
mutex_lock(conn->recv_lock);
if (conn->recv_pkg && conn->recv_pkg->command == 0x01) {
tcp_package_t* hb = tcp_package_create(0x02, &conn->recv_pkg->correlation_id, buffer_create(0));
conn->recv_pkg = 0;
connection_send_tcp_package(conn, hb);
}
mutex_unlock(conn->recv_lock);
//mutex_unlock(conn->mutex_lock);
sleep(1);
sleep(10);
}
}
@@ -200,7 +253,12 @@ int esc_connection_connect(const esc_connection_t* conn) {
if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) {
return -2;
}
// Identify
// Start manager thread
_conn->manager_thread = thread_create(connection_thread, _conn);
if (thread_start(_conn->manager_thread)) {
return -3;
}
// Identify
// build message
EventStore__Client__Messages__IdentifyClient identify_client;
event_store__client__messages__identify_client__init(&identify_client);
@@ -211,24 +269,13 @@ int esc_connection_connect(const esc_connection_t* conn) {
event_store__client__messages__identify_client__pack(&identify_client, buffer);
// build tcp_package
const tcp_package_t* send_pkg = tcp_package_create(0xF5, esc_uuid_create(), buffer_from(buffer, s));
if (connection_send_tcp_package(conn, send_pkg) != 0) {
return -3;
}
const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn);
if (recv_pkg == 0) {
return -4;
}
connection_enqueue_send(_conn, send_pkg);
const tcp_package_t* recv_pkg = connection_wait_for(_conn, &send_pkg->correlation_id);
if (recv_pkg->command != 0xF6) {
return -5;
}
_conn->mutex_lock = mutex_create();
_conn->manager_thread = thread_create(connection_thread, _conn);
if (thread_start(_conn->manager_thread)) {
return -6;
}
return 0;
return 0;
}
const esc_credentials_t* esc_credentials_create(const char* username, const char* password) {
@@ -246,8 +293,8 @@ const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__
ev->event_number = msg->event_number;
ev->event_stream_id = string_copy(msg->event_stream_id);
ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0;
ev->data = buffer_copyfrom(msg->data.data, msg->data.len);
ev->metadata = buffer_copyfrom(msg->metadata.data, msg->metadata.len);
ev->data = buffer_from(msg->data.data, msg->data.len);
ev->metadata = buffer_from(msg->metadata.data, msg->metadata.len);
return ev;
}
@@ -273,14 +320,9 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti
uint8_t buffer[s];
event_store__client__messages__read_all_events__pack(&send_msg, buffer);
const tcp_package_t* send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password);
if (connection_send_tcp_package(conn, send_pkg) != 0) {
return 0;
}
connection_enqueue_send(conn, send_pkg);
const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn);
if (recv_pkg == 0) {
return 0;
}
const tcp_package_t* recv_pkg = connection_wait_for(conn, &send_pkg->correlation_id);
if (recv_pkg->command != 0xB7) {
return 0;
}