move code to src
add linux implementation
This commit is contained in:
252
src/esc.c
Normal file
252
src/esc.c
Normal file
@@ -0,0 +1,252 @@
|
||||
//
|
||||
// Created by nicol on 2018-03-18.
|
||||
//
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include "esc.h"
|
||||
#include "socket.h"
|
||||
#include "buffer.h"
|
||||
#include "proto.h"
|
||||
#include "tcp_package.h"
|
||||
|
||||
struct st_connection_settings {
|
||||
unsigned char use_ssl_connection;
|
||||
};
|
||||
|
||||
const esc_connection_settings_t default_connection_settings = {
|
||||
};
|
||||
|
||||
const esc_connection_settings_t* esc_default_connection_settings = &default_connection_settings;
|
||||
|
||||
struct st_tcp_endpoint {
|
||||
char* host;
|
||||
unsigned short port;
|
||||
};
|
||||
typedef struct st_tcp_endpoint tcp_endpoint_t;
|
||||
|
||||
struct st_node_endpoints {
|
||||
tcp_endpoint_t tcp_endpoint;
|
||||
tcp_endpoint_t secure_tcp_endpoint;
|
||||
};
|
||||
typedef struct st_node_endpoints node_endpoints_t;
|
||||
|
||||
typedef const node_endpoints_t* (*endpoint_discoverer_t)(const void* discover_data, const tcp_endpoint_t* failed_tcp_endpoint);
|
||||
|
||||
struct st_connection {
|
||||
esc_connection_settings_t settings;
|
||||
void* discoverer_data;
|
||||
endpoint_discoverer_t discover;
|
||||
socket_t tcp_conn;
|
||||
ProtobufCAllocator protobuf_c_allocator;
|
||||
};
|
||||
|
||||
struct st_static_endpoint_discoverer {
|
||||
tcp_endpoint_t tcp_endpoint;
|
||||
unsigned char use_ssl_connection;
|
||||
};
|
||||
typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t;
|
||||
|
||||
const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* discover_data, const tcp_endpoint_t* failed_tcp_endpoint) {
|
||||
node_endpoints_t* result = malloc(sizeof(node_endpoints_t));
|
||||
if (discover_data->use_ssl_connection) {
|
||||
result->secure_tcp_endpoint = discover_data->tcp_endpoint;
|
||||
} else {
|
||||
result->tcp_endpoint = discover_data->tcp_endpoint;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) {
|
||||
char uuid_buf[37];
|
||||
printf("connection_send_tcp_package: %u %u %s %u\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size);
|
||||
buffer_t send_buffer = tcp_package_to_buffer(pkg);
|
||||
if (socket_send(conn->tcp_conn, (char *) &send_buffer.size, sizeof(uint32_t)) <= 0) {
|
||||
buffer_free(send_buffer);
|
||||
return -1;
|
||||
}
|
||||
if (socket_send(conn->tcp_conn, (char *) send_buffer.data, send_buffer.size) <= 0) {
|
||||
buffer_free(send_buffer);
|
||||
return -2;
|
||||
}
|
||||
buffer_free(send_buffer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) {
|
||||
uint32_t recv_size;
|
||||
int rc;
|
||||
if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) {
|
||||
printf("%d %d", rc, socket_error());
|
||||
return 0;
|
||||
}
|
||||
buffer_t recv_buffer = buffer_create(recv_size);
|
||||
while(recv_size > 0) {
|
||||
int pos = recv_buffer.size - recv_size;
|
||||
rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer.data[pos], recv_size);
|
||||
recv_size -= rc;
|
||||
}
|
||||
const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer);
|
||||
buffer_free(recv_buffer);
|
||||
char uuid_buf[37];
|
||||
printf("connection_recv_tcp_package: %u %u %s %u\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size);
|
||||
//for (int32_t i=0;i<recv_pkg->data.size;i++) {
|
||||
// printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]);
|
||||
//}
|
||||
//printf("\n");
|
||||
return recv_pkg;
|
||||
}
|
||||
|
||||
void* protobuf_c_alloc(void *alloc_data, size_t size) {
|
||||
return malloc(size);
|
||||
}
|
||||
|
||||
void protobuf_c_free(void *alloc_data, void* p) {
|
||||
free(p);
|
||||
}
|
||||
|
||||
const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) {
|
||||
if (strncmp(addr, "tcp://", 6) != 0) {
|
||||
return 0;
|
||||
}
|
||||
char* pos = strrchr(addr, ':');
|
||||
if (pos == 0) {
|
||||
return 0;
|
||||
}
|
||||
char* host = malloc(pos - addr - 5);
|
||||
strncpy(host, addr+6, pos-addr-6);
|
||||
host[pos-addr-6] = 0;
|
||||
unsigned short port = (unsigned short)atoi(pos+1);
|
||||
static_endpoint_discoverer_t discover_data = {
|
||||
{host, port}, connection_settings->use_ssl_connection
|
||||
};
|
||||
struct st_connection* conn = malloc(sizeof(struct st_connection));
|
||||
conn->settings = *connection_settings;
|
||||
conn->discoverer_data = malloc(sizeof(static_endpoint_discoverer_t));
|
||||
memcpy(conn->discoverer_data, &discover_data, sizeof(static_endpoint_discoverer_t));
|
||||
conn->discover = (endpoint_discoverer_t)static_discover;
|
||||
conn->protobuf_c_allocator.alloc = protobuf_c_alloc;
|
||||
conn->protobuf_c_allocator.free = protobuf_c_free;
|
||||
conn->protobuf_c_allocator.allocator_data = 0;
|
||||
return conn;
|
||||
}
|
||||
|
||||
// return 0 on success
|
||||
// return non-zero on failure and sets last_error on connection
|
||||
int esc_connection_connect(const esc_connection_t* conn) {
|
||||
// Discover endpoint
|
||||
const node_endpoints_t* endpoints = conn->discover(conn->discoverer_data, 0);
|
||||
if (endpoints == 0) {
|
||||
return -1;
|
||||
}
|
||||
// Establish Tcp Connection
|
||||
esc_connection_t* _conn = (esc_connection_t*)conn;
|
||||
_conn->tcp_conn = socket_create(SOCKET_TYPE_TCP);
|
||||
tcp_endpoint_t endpoint = conn->settings.use_ssl_connection ? endpoints->secure_tcp_endpoint : endpoints->tcp_endpoint;
|
||||
if (socket_connect(conn->tcp_conn, endpoint.host, endpoint.port) != 0) {
|
||||
return -2;
|
||||
}
|
||||
// Identify
|
||||
// build message
|
||||
EventStore__Client__Messages__IdentifyClient identify_client;
|
||||
event_store__client__messages__identify_client__init(&identify_client);
|
||||
identify_client.connection_name = "abc123";
|
||||
identify_client.version = 1;
|
||||
size_t s = event_store__client__messages__identify_client__get_packed_size(&identify_client);
|
||||
uint8_t buffer[s];
|
||||
event_store__client__messages__identify_client__pack(&identify_client, buffer);
|
||||
// build tcp_package
|
||||
const tcp_package_t* send_pkg = tcp_package_create(0xF5, esc_uuid_create(), buffer_from(buffer, s));
|
||||
if (connection_send_tcp_package(conn, send_pkg) != 0) {
|
||||
return -3;
|
||||
}
|
||||
const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn);
|
||||
if (recv_pkg == 0) {
|
||||
return -4;
|
||||
}
|
||||
if (recv_pkg->command != 0xF6) {
|
||||
return -5;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
const esc_credentials_t* esc_credentials_create(const char* username, const char* password) {
|
||||
esc_credentials_t* creds = malloc(sizeof(struct st_credentials));
|
||||
creds->username = username;
|
||||
creds->password = password;
|
||||
return creds;
|
||||
}
|
||||
|
||||
const char* string_clone(const char* src) {
|
||||
char* dst = malloc(strlen(src)+1);
|
||||
strcpy(dst, src);
|
||||
return dst;
|
||||
}
|
||||
|
||||
const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) {
|
||||
if (msg == 0) return 0;
|
||||
esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t));
|
||||
ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len);
|
||||
ev->event_type = string_clone(msg->event_type);
|
||||
return ev;
|
||||
}
|
||||
|
||||
esc_resolved_event_t* resolved_event_create(EventStore__Client__Messages__ResolvedEvent* msg) {
|
||||
esc_resolved_event_t* ev = malloc(sizeof(esc_resolved_event_t));
|
||||
ev->original_position.prepare_position = msg->prepare_position;
|
||||
ev->original_position.commit_position = msg->commit_position;
|
||||
ev->event = recorded_event_create(msg->event);
|
||||
ev->link = recorded_event_create(msg->link);
|
||||
return ev;
|
||||
}
|
||||
|
||||
const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials) {
|
||||
EventStore__Client__Messages__ReadAllEvents send_msg;
|
||||
event_store__client__messages__read_all_events__init(&send_msg);
|
||||
send_msg.prepare_position = last_checkpoint ? last_checkpoint->prepare_position : 0;
|
||||
send_msg.commit_position = last_checkpoint ? last_checkpoint->commit_position : 0;
|
||||
send_msg.max_count = count;
|
||||
send_msg.require_master = 0;
|
||||
send_msg.resolve_link_tos = 0;
|
||||
size_t s = event_store__client__messages__read_all_events__get_packed_size(&send_msg);
|
||||
uint8_t buffer[s];
|
||||
event_store__client__messages__read_all_events__pack(&send_msg, buffer);
|
||||
const tcp_package_t* send_pkg = tcp_package_create_authenticated(0xB6, esc_uuid_create(), buffer_from(buffer, s), credentials->username, credentials->password);
|
||||
if (connection_send_tcp_package(conn, send_pkg) != 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const tcp_package_t* recv_pkg = connection_recv_tcp_package(conn);
|
||||
if (recv_pkg == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (recv_pkg->command != 0xB7) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg =
|
||||
event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data);
|
||||
|
||||
esc_all_events_slice_t* result = malloc(sizeof(esc_all_events_slice_t));
|
||||
result->read_direction = "forward";
|
||||
result->is_end_of_stream = (recv_msg->events == 0 || recv_msg->n_events == 0) ? 1 : 0;
|
||||
result->from_position.prepare_position = recv_msg->prepare_position;
|
||||
result->from_position.commit_position = recv_msg->commit_position;
|
||||
result->next_position.prepare_position = recv_msg->next_prepare_position;
|
||||
result->next_position.commit_position = recv_msg->next_commit_position;
|
||||
result->n_events = recv_msg->n_events;
|
||||
result->events = malloc(recv_msg->n_events * sizeof(esc_resolved_event_t*));
|
||||
for (size_t i = 0; i < recv_msg->n_events; i++) {
|
||||
result->events[i] = resolved_event_create(recv_msg->events[i]);
|
||||
}
|
||||
event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) {
|
||||
snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position);
|
||||
return buffer;
|
||||
}
|
Reference in New Issue
Block a user