fixed WIN32 build

added subscribe_to_all (wip)
This commit is contained in:
Nicolas Dextraze 2018-03-25 18:03:44 -07:00
parent dcca4b2709
commit 4e585e4372
18 changed files with 411 additions and 52 deletions

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.9) cmake_minimum_required(VERSION 3.9)
project(esc C) project(evenstore-client-c C)
set(CMAKE_C_STANDARD 99) set(CMAKE_C_STANDARD 99)
@ -10,11 +10,12 @@ if(WIN32)
link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs)
endif() endif()
add_library(eventstore-client-c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h include/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c src/position.c src/position.h src/credentials.c src/credentials.h src/connection.c src/connection.h src/utils/bool.h src/endpoint_discoverer.h src/utils/error.c src/utils/error.h src/endpoint_discoverer.c src/proto_helper.h src/proto_helper.c src/results.h src/results.c src/event_data.c src/event_data.h src/utils/array.c src/utils/array.h src/utils/event.c src/utils/event.h) add_library(eventstore-client-c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h include/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c src/position.c src/position.h src/credentials.c src/credentials.h src/connection.c src/connection.h src/utils/bool.h src/endpoint_discoverer.h src/utils/error.c src/utils/error.h src/endpoint_discoverer.c src/proto_helper.h src/proto_helper.c src/results.h src/results.c src/event_data.c src/event_data.h src/utils/array.c src/utils/array.h src/utils/event.c src/utils/event.h src/subscription.c src/subscription.h)
add_executable(esc test/main.c include/esc.h) add_executable(esc test/main.c include/esc.h)
if(WIN32) if(WIN32)
target_link_libraries(esc wsock32 ws2_32 eventstore-client-c.a libprotobuf-c.a) target_link_libraries(eventstore-client-c wsock32 ws2_32)
target_link_libraries(esc eventstore-client-c libprotobuf-c.a)
else() else()
target_link_libraries(eventstore-client-c pthread) target_link_libraries(eventstore-client-c pthread)
target_link_libraries(esc pthread eventstore-client-c protobuf-c) target_link_libraries(esc pthread eventstore-client-c protobuf-c)

View File

@ -19,6 +19,9 @@ array_t* array_create(size_t n, ...);
void array_destroy(array_t* array, array_deallocator destroyer); void array_destroy(array_t* array, array_deallocator destroyer);
// uuid // uuid
#if defined(_WIN32) && defined(UUID_DEFINED)
#undef uuid_t
#endif
typedef struct st_uuid uuid_t; typedef struct st_uuid uuid_t;
uuid_t* uuid_create(); uuid_t* uuid_create();
void uuid_destroy(uuid_t*); void uuid_destroy(uuid_t*);
@ -45,10 +48,10 @@ typedef struct st_error {
typedef struct st_esc_position esc_position_t; typedef struct st_esc_position esc_position_t;
typedef struct st_esc_connection_settings esc_connection_settings_t; typedef struct st_esc_connection_settings esc_connection_settings_t;
typedef struct st_connection esc_connection_t; typedef struct st_esc_connection esc_connection_t;
typedef struct st_credentials esc_credentials_t; typedef struct st_esc_credentials esc_credentials_t;
struct st_recorded_event { typedef struct st_esc_recorded_event {
uuid_t* event_id; uuid_t* event_id;
const char* event_type; const char* event_type;
int64_t event_number; int64_t event_number;
@ -56,15 +59,13 @@ struct st_recorded_event {
int64_t created_epoch; int64_t created_epoch;
buffer_t* data; buffer_t* data;
buffer_t* metadata; buffer_t* metadata;
}; } esc_recorded_event_t;
typedef struct st_recorded_event esc_recorded_event_t;
struct st_resolved_event { typedef struct st_esc_resolved_event {
esc_recorded_event_t* event; esc_recorded_event_t* event;
esc_recorded_event_t* link; esc_recorded_event_t* link;
esc_position_t* original_position; esc_position_t* original_position;
}; } esc_resolved_event_t;
typedef struct st_resolved_event esc_resolved_event_t;
struct st_all_events_slice { struct st_all_events_slice {
char* read_direction; char* read_direction;
@ -83,6 +84,20 @@ typedef struct st_esc_write_result {
typedef struct st_esc_event_data esc_event_data_t; typedef struct st_esc_event_data esc_event_data_t;
typedef struct st_esc_subscription esc_subscription_t;
typedef void (*esc_event_appeared_t)(void* ctx, esc_subscription_t* s, esc_resolved_event_t* ev);
typedef struct st_event_appeared_callback {
esc_event_appeared_t fn;
void* ctx;
} esc_event_appeared_callback_t;
typedef void (*esc_subscription_droppped_t)(void* ctx, esc_subscription_t* subscription, const char* reason, error_t* error);
typedef struct st_subscription_dropped_callback {
esc_subscription_droppped_t fn;
void* ctx;
} esc_subscription_dropped_callback_t;
esc_connection_settings_t* const esc_default_connection_settings; esc_connection_settings_t* const esc_default_connection_settings;
// Connection // Connection
@ -105,6 +120,14 @@ void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice);
error_t* esc_connection_last_error(esc_connection_t* conn); error_t* esc_connection_last_error(esc_connection_t* conn);
// Subscriptions
extern esc_event_appeared_callback_t esc_empty_event_appeared_callback;
extern esc_subscription_dropped_callback_t esc_empty_subscription_dropped_callback;
esc_subscription_t* esc_connection_subscribe_to_all(esc_connection_t* conn, bool_t resolve_link_tos, esc_event_appeared_callback_t event_appeared_callback, esc_subscription_dropped_callback_t subscription_dropped_callback, esc_credentials_t* credentials);
void esc_subscription_stop(esc_subscription_t* subscription);
void esc_subscription_destroy(esc_subscription_t* subscription);
// Formatting // Formatting
const char* uuid_format(uuid_t* uuid, char* buf, size_t buf_size); const char* uuid_format(uuid_t* uuid, char* buf, size_t buf_size);
const char* esc_position_format(esc_position_t* position, char* buffer, size_t buf_size); const char* esc_position_format(esc_position_t* position, char* buffer, size_t buf_size);

View File

@ -2,20 +2,6 @@
// Created by nicolas on 22/03/18. // Created by nicolas on 22/03/18.
// //
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "connection.h"
#include "tcp_package.h"
#include "tcp_messages.h"
#include "credentials.h"
#include "proto_helper.h"
#include "utils/array.h"
#include "utils/debug.h"
#include "utils/error.h"
#include "utils/string.h"
#include "utils/socket.h"
// usleep // usleep
#ifdef _WIN32 #ifdef _WIN32
#include <windows.h> #include <windows.h>
@ -25,6 +11,22 @@
#include <unistd.h> #include <unistd.h>
#endif #endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "connection.h"
#include "subscription.h"
#include "tcp_package.h"
#include "tcp_messages.h"
#include "credentials.h"
#include "proto_helper.h"
#include "utils/array.h"
#include "utils/debug.h"
#include "utils/error.h"
#include "utils/string.h"
#include "utils/socket.h"
#include "proto.h"
struct st_esc_connection_settings default_connection_settings = { struct st_esc_connection_settings default_connection_settings = {
BOOL_FALSE BOOL_FALSE
}; };
@ -35,7 +37,7 @@ esc_connection_settings_t* esc_default_connection_settings = &default_connection
ssize_t connection_send_tcp_package(esc_connection_t* conn, tcp_package_t* pkg) { ssize_t connection_send_tcp_package(esc_connection_t* conn, tcp_package_t* pkg) {
char uuid_buf[37]; char uuid_buf[37];
#ifdef _WIN32 #ifdef _WIN32
fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data));
#else #else
fprintf(stderr, "connection_send_tcp_package: %s %u %s %lu ", get_string_for_tcp_message(pkg->command), pkg->flags, fprintf(stderr, "connection_send_tcp_package: %s %u %s %lu ", get_string_for_tcp_message(pkg->command), pkg->flags,
uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data));
@ -87,7 +89,7 @@ tcp_package_t* connection_recv_tcp_package(esc_connection_t* conn) {
buffer_destroy(recv_buffer); buffer_destroy(recv_buffer);
char uuid_buf[37]; char uuid_buf[37];
#ifdef _WIN32 #ifdef _WIN32
fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data));
#else #else
fprintf(stderr, "connection_recv_tcp_package: %s %u %s %lu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, fprintf(stderr, "connection_recv_tcp_package: %s %u %s %lu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags,
uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data));
@ -111,6 +113,7 @@ void* connection_thread(void* arg) {
} }
if (socket_readable(conn->tcp_conn)) { if (socket_readable(conn->tcp_conn)) {
esc_subscription_t* subscription = 0;
tcp_package_t* recv_pkg = connection_recv_tcp_package(conn); tcp_package_t* recv_pkg = connection_recv_tcp_package(conn);
if (recv_pkg == 0) { if (recv_pkg == 0) {
//TODO Handle connection lost //TODO Handle connection lost
@ -119,6 +122,8 @@ void* connection_thread(void* arg) {
tcp_package_t* heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); tcp_package_t* heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0));
queue_enqueue(conn->send_queue, heartbeat_pkg); queue_enqueue(conn->send_queue, heartbeat_pkg);
tcp_package_destroy(recv_pkg); tcp_package_destroy(recv_pkg);
} else if ((subscription = queue_find(conn->subscriptions, esc_find_subscription_by_correlation_id, recv_pkg->correlation_id)) != 0) {
esc_subscription_handle_tcp_package(subscription, recv_pkg);
} else { } else {
queue_enqueue(conn->recv_queue, recv_pkg); queue_enqueue(conn->recv_queue, recv_pkg);
} }
@ -151,7 +156,7 @@ tcp_package_t* connection_wait_for(esc_connection_t* conn, uuid_t* correlation_i
} }
esc_connection_t* esc_connection_create(esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { esc_connection_t* esc_connection_create(esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) {
struct st_connection* conn = malloc(sizeof(struct st_connection)); struct st_esc_connection* conn = malloc(sizeof(struct st_esc_connection));
conn->settings = connection_settings; conn->settings = connection_settings;
if (connection_name == 0 || strcmp(connection_name, "") == 0) { if (connection_name == 0 || strcmp(connection_name, "") == 0) {
uuid_t* uuid = uuid_create(); uuid_t* uuid = uuid_create();
@ -167,6 +172,7 @@ esc_connection_t* esc_connection_create(esc_connection_settings_t* connection_se
conn->stop = 0; conn->stop = 0;
conn->stopped_event = event_create(); conn->stopped_event = event_create();
conn->last_error = 0; conn->last_error = 0;
conn->subscriptions = queue_create();
if (strncmp(addr, "tcp://", 6) != 0) { if (strncmp(addr, "tcp://", 6) != 0) {
conn->last_error = error_create(1, "invalid schema for address: %s", addr); conn->last_error = error_create(1, "invalid schema for address: %s", addr);
@ -203,6 +209,7 @@ void esc_connection_destroy(esc_connection_t* conn) {
queue_destroy(conn->send_queue); queue_destroy(conn->send_queue);
event_destroy(conn->stopped_event); event_destroy(conn->stopped_event);
free((void*)conn->name); free((void*)conn->name);
queue_destroy(conn->subscriptions);
free(conn); free(conn);
} }
@ -310,6 +317,26 @@ esc_write_result_t* esc_append_to_stream(esc_connection_t* conn, const char* str
return result; return result;
} }
esc_subscription_t* esc_connection_subscribe_to_all(esc_connection_t* conn, bool_t resolve_link_tos, esc_event_appeared_callback_t event_appeared_callback, esc_subscription_dropped_callback_t subscription_dropped_callback, esc_credentials_t* credentials) {
uuid_t* correlation_id = uuid_create();
esc_subscription_t* subscription = esc_subscription_create(conn, correlation_id, event_appeared_callback, subscription_dropped_callback);
queue_enqueue(conn->subscriptions, subscription);
buffer_t* msg_buf = esc_subscribe_to_stream_pack("", resolve_link_tos);
tcp_package_t* send_pkg = tcp_package_create_authenticated(MESSAGE_SUBSCRIBETOSTREAM, correlation_id, msg_buf, credentials->username, credentials->password);
queue_enqueue(conn->send_queue, send_pkg);
bool_t subscribing = BOOL_TRUE;
while(subscribing) {
subscribing = esc_subscription_state(subscription) == ESC_SUBSCRIBING;
usleep(1);
}
uuid_destroy(correlation_id);
return subscription;
}
void esc_connection_close(esc_connection_t* conn) { void esc_connection_close(esc_connection_t* conn) {
if (conn->stop == 1) return; if (conn->stop == 1) return;
conn->stop = 1; conn->stop = 1;

View File

@ -6,13 +6,14 @@
#define ESC_ESC_CONNECTION_H #define ESC_ESC_CONNECTION_H
#include <protobuf-c/protobuf-c.h> #include <protobuf-c/protobuf-c.h>
#include "endpoint_discoverer.h"
#include "tcp_package.h"
#include "utils/bool.h" #include "utils/bool.h"
#include "utils/socket.h" #include "utils/socket.h"
#include "utils/thread.h" #include "utils/thread.h"
#include "utils/mutex.h" #include "utils/mutex.h"
#include "utils/queue.h" #include "utils/queue.h"
#include "utils/error.h" #include "utils/error.h"
#include "endpoint_discoverer.h"
#include "utils/event.h" #include "utils/event.h"
typedef struct st_esc_connection_settings { typedef struct st_esc_connection_settings {
@ -21,7 +22,7 @@ typedef struct st_esc_connection_settings {
esc_connection_settings_t* esc_default_connection_settings; esc_connection_settings_t* esc_default_connection_settings;
typedef struct st_connection { typedef struct st_esc_connection {
esc_connection_settings_t* settings; esc_connection_settings_t* settings;
const char* name; const char* name;
void* discoverer_data; void* discoverer_data;
@ -33,8 +34,10 @@ typedef struct st_connection {
queue_t* recv_queue; queue_t* recv_queue;
error_t* last_error; error_t* last_error;
event_t* stopped_event; event_t* stopped_event;
queue_t* subscriptions;
} esc_connection_t; } esc_connection_t;
tcp_package_t* connection_wait_for(esc_connection_t* conn, uuid_t* correlation_id);
error_t* esc_connection_last_error(esc_connection_t* conn); error_t* esc_connection_last_error(esc_connection_t* conn);
void esc_connection_close(esc_connection_t* conn); void esc_connection_close(esc_connection_t* conn);

View File

@ -8,7 +8,7 @@
#include "credentials.h" #include "credentials.h"
esc_credentials_t* esc_credentials_create(const char* username, const char* password) { esc_credentials_t* esc_credentials_create(const char* username, const char* password) {
esc_credentials_t* creds = malloc(sizeof(struct st_credentials)); esc_credentials_t* creds = malloc(sizeof(struct st_esc_credentials));
creds->username = string_copy(username); creds->username = string_copy(username);
creds->password = string_copy(password); creds->password = string_copy(password);
return creds; return creds;

View File

@ -5,11 +5,11 @@
#ifndef ESC_ESC_CREDENTIALS_H #ifndef ESC_ESC_CREDENTIALS_H
#define ESC_ESC_CREDENTIALS_H #define ESC_ESC_CREDENTIALS_H
struct st_credentials { struct st_esc_credentials {
const char* username; const char* username;
const char* password; const char* password;
}; };
typedef struct st_credentials esc_credentials_t; typedef struct st_esc_credentials esc_credentials_t;
esc_credentials_t* esc_credentials_create(const char* username, const char* password); esc_credentials_t* esc_credentials_create(const char* username, const char* password);
void esc_credentials_destroy(esc_credentials_t* creds); void esc_credentials_destroy(esc_credentials_t* creds);

View File

@ -2,12 +2,10 @@
// Created by nicolas on 22/03/18. // Created by nicolas on 22/03/18.
// //
#include "utils/debug.h"
#include "utils/string.h"
#include "results.h"
#include "proto_helper.h" #include "proto_helper.h"
#include "proto.h" #include "proto.h"
#include "utils/bool.h" #include "utils/debug.h"
#include "utils/string.h"
#include "event_data.h" #include "event_data.h"
void* protobuf_c_alloc(void *alloc_data, size_t size) { void* protobuf_c_alloc(void *alloc_data, size_t size) {
@ -48,7 +46,7 @@ buffer_t* esc_identify_client_pack(const char* connection_name) {
esc_recorded_event_t* esc_recorded_event_unpack(EventStore__Client__Messages__EventRecord* msg) { esc_recorded_event_t* esc_recorded_event_unpack(EventStore__Client__Messages__EventRecord* msg) {
if (msg == 0) return 0; if (msg == 0) return 0;
esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event)); esc_recorded_event_t* ev = malloc(sizeof(struct st_esc_recorded_event));
ev->event_id = uuid_from(msg->event_id.data, msg->event_id.len); ev->event_id = uuid_from(msg->event_id.data, msg->event_id.len);
ev->event_type = string_copy(msg->event_type); ev->event_type = string_copy(msg->event_type);
ev->event_number = msg->event_number; ev->event_number = msg->event_number;
@ -60,13 +58,21 @@ esc_recorded_event_t* esc_recorded_event_unpack(EventStore__Client__Messages__Ev
} }
esc_resolved_event_t* esc_resolved_event_unpack(EventStore__Client__Messages__ResolvedEvent* msg) { esc_resolved_event_t* esc_resolved_event_unpack(EventStore__Client__Messages__ResolvedEvent* msg) {
esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event)); esc_resolved_event_t* ev = malloc(sizeof(struct st_esc_resolved_event));
ev->original_position = esc_position_create(msg->prepare_position, msg->commit_position); ev->original_position = esc_position_create(msg->prepare_position, msg->commit_position);
ev->event = esc_recorded_event_unpack(msg->event); ev->event = esc_recorded_event_unpack(msg->event);
ev->link = esc_recorded_event_unpack(msg->link); ev->link = esc_recorded_event_unpack(msg->link);
return ev; return ev;
} }
esc_resolved_event_t* esc_stream_event_appeared_unpack(buffer_t* data) {
EventStore__Client__Messages__StreamEventAppeared* recv_msg =
event_store__client__messages__stream_event_appeared__unpack(&protobuf_allocator, buffer_size(data), buffer_data(data));
esc_resolved_event_t* ev = esc_resolved_event_unpack(recv_msg->event);
event_store__client__messages__stream_event_appeared__free_unpacked(recv_msg, &protobuf_allocator);
return ev;
}
const char* get_string_for_all_events_result(int result) { const char* get_string_for_all_events_result(int result) {
switch(result) { switch(result) {
case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Success: return "Success"; case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Success: return "Success";
@ -210,3 +216,14 @@ buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version
free(send_msg.events); free(send_msg.events);
return buffer_copyfrom(buffer, buf_size); return buffer_copyfrom(buffer, buf_size);
} }
buffer_t* esc_subscribe_to_stream_pack(const char* stream, bool_t resolve_link_tos) {
EventStore__Client__Messages__SubscribeToStream send_msg;
event_store__client__messages__subscribe_to_stream__init(&send_msg);
send_msg.event_stream_id = (char*)stream;
send_msg.resolve_link_tos = resolve_link_tos;
size_t buf_size = event_store__client__messages__subscribe_to_stream__get_packed_size(&send_msg);
uint8_t buffer[buf_size];
event_store__client__messages__subscribe_to_stream__pack(&send_msg, buffer);
return buffer_copyfrom(buffer, buf_size);
}

View File

@ -5,12 +5,15 @@
#ifndef ESC_PROTO_HELPER_H #ifndef ESC_PROTO_HELPER_H
#define ESC_PROTO_HELPER_H #define ESC_PROTO_HELPER_H
#include <protobuf-c/protobuf-c.h>
#include "utils/array.h" #include "utils/array.h"
#include "utils/buffer.h" #include "utils/buffer.h"
#include "utils/bool.h" #include "utils/bool.h"
#include "utils/error.h" #include "utils/error.h"
#include "results.h" #include "results.h"
extern ProtobufCAllocator protobuf_allocator;
typedef enum { typedef enum {
Operation_Decision_DoNothing, Operation_Decision_DoNothing,
Operation_Decision_EndOperation, Operation_Decision_EndOperation,
@ -28,6 +31,8 @@ void inspection_result_destroy(inspection_result_t*);
buffer_t* esc_identify_client_pack(const char* connection_name); buffer_t* esc_identify_client_pack(const char* connection_name);
esc_resolved_event_t* esc_stream_event_appeared_unpack(buffer_t* data);
inspection_result_t* esc_all_events_slice_unpack(buffer_t* buffer, esc_all_events_slice_t** all_events_slice_p); inspection_result_t* esc_all_events_slice_unpack(buffer_t* buffer, esc_all_events_slice_t** all_events_slice_p);
buffer_t* esc_read_all_forward_pack(esc_position_t* last_checkpoint, int32_t count, bool_t resolve_link_tos); buffer_t* esc_read_all_forward_pack(esc_position_t* last_checkpoint, int32_t count, bool_t resolve_link_tos);
const char* get_string_for_all_events_result(int result); const char* get_string_for_all_events_result(int result);
@ -36,4 +41,6 @@ inspection_result_t* esc_write_result_unpack(buffer_t* buffer, esc_write_result_
buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version, array_t* events); buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version, array_t* events);
const char* get_string_for_write_result(int result); const char* get_string_for_write_result(int result);
buffer_t* esc_subscribe_to_stream_pack(const char* stream, bool_t resolve_link_tos);
#endif //ESC_PROTO_HELPER_H #endif //ESC_PROTO_HELPER_H

View File

@ -9,7 +9,7 @@
#include "utils/buffer.h" #include "utils/buffer.h"
#include "position.h" #include "position.h"
struct st_recorded_event { typedef struct st_esc_recorded_event {
uuid_t* event_id; uuid_t* event_id;
const char* event_type; const char* event_type;
int64_t event_number; int64_t event_number;
@ -17,15 +17,13 @@ struct st_recorded_event {
int64_t created_epoch; int64_t created_epoch;
buffer_t* data; buffer_t* data;
buffer_t* metadata; buffer_t* metadata;
}; } esc_recorded_event_t;
typedef struct st_recorded_event esc_recorded_event_t;
struct st_resolved_event { typedef struct st_esc_resolved_event {
esc_recorded_event_t* event; esc_recorded_event_t* event;
esc_recorded_event_t* link; esc_recorded_event_t* link;
esc_position_t* original_position; esc_position_t* original_position;
}; } esc_resolved_event_t;
typedef struct st_resolved_event esc_resolved_event_t;
struct st_all_events_slice { struct st_all_events_slice {
char* read_direction; char* read_direction;
@ -42,9 +40,14 @@ typedef struct st_esc_write_result_t {
esc_position_t* log_position; esc_position_t* log_position;
} esc_write_result_t; } esc_write_result_t;
typedef struct st_esc_subscription_confirmation {
} esc_subscription_confirmation_t;
void esc_recorded_event_destroy(esc_recorded_event_t* recorded_event); void esc_recorded_event_destroy(esc_recorded_event_t* recorded_event);
void esc_resolved_event_destroy(esc_resolved_event_t* resolved_event); void esc_resolved_event_destroy(esc_resolved_event_t* resolved_event);
void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice); void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice);
void esc_write_result_destroy(esc_write_result_t* write_result); void esc_write_result_destroy(esc_write_result_t* write_result);
void esc_subscription_confirmation_destroy(esc_subscription_confirmation_t* subscription_confirmation);
#endif //ESC_RESULTS_H #endif //ESC_RESULTS_H

170
src/subscription.c Normal file
View File

@ -0,0 +1,170 @@
//
// Created by nicol on 2018-03-25.
//
#ifdef _WIN32
#include <windows.h>
#include <stdio.h>
#define usleep Sleep
#endif
#include "subscription.h"
#include "connection.h"
#include "credentials.h"
#include "proto_helper.h"
#include "tcp_package.h"
#include "tcp_messages.h"
#include "utils/uuid.h"
#include "utils/buffer.h"
#include "utils/debug.h"
#include "proto.h"
esc_event_appeared_callback_t esc_empty_event_appeared_callback = {0,0};
esc_subscription_dropped_callback_t esc_empty_subscription_dropped_callback = {0,0};
struct st_esc_subscription {
mutex_t* lock;
esc_subscription_state_t state;
esc_connection_t* conn;
uuid_t* correlation_id;
esc_event_appeared_callback_t event_appeared;
esc_subscription_dropped_callback_t subscription_dropped;
int64_t last_commit_position;
int64_t last_event_number;
error_t* error;
thread_t* worker_thread;
queue_t* events;
event_t* stopped;
};
void* esc_subscription_worker_thread(void* arg) {
esc_subscription_t* subscription = arg;
bool_t subscribed = BOOL_TRUE;
while(subscribed) {
esc_resolved_event_t* ev = queue_dequeue(subscription->events);
//TODO full logic
if (ev && subscription->event_appeared.fn) {
subscription->event_appeared.fn(subscription->event_appeared.ctx, subscription, ev);
}
if (ev) esc_resolved_event_destroy(ev);
mutex_lock(subscription->lock);
subscribed = subscription->state == ESC_SUBSCRIBED;
mutex_unlock(subscription->lock);
usleep(1);
}
//TODO reason
if (subscription->subscription_dropped.fn) {
subscription->subscription_dropped.fn(subscription->subscription_dropped.ctx, subscription, "", subscription->error);
}
//TODO uuid instead of ptr
printf("Subscription %p worker thread stopped\n", subscription);
event_set(subscription->stopped);
}
esc_subscription_t* esc_subscription_create(esc_connection_t* conn, uuid_t* correlation_id,
esc_event_appeared_callback_t event_appeared,
esc_subscription_dropped_callback_t subscription_dropped
) {
esc_subscription_t* subscription = malloc(sizeof(esc_subscription_t));
subscription->lock = mutex_create();
subscription->state = ESC_SUBSCRIBING;
subscription->conn = conn;
subscription->correlation_id = uuid_copy(correlation_id);
subscription->event_appeared = event_appeared;
subscription->subscription_dropped = subscription_dropped;
subscription->error = 0;
subscription->worker_thread = thread_create(esc_subscription_worker_thread, subscription);
subscription->events = queue_create();
subscription->stopped = event_create();
return subscription;
}
void esc_subscription_destroy(esc_subscription_t* subscription) {
esc_subscription_stop(subscription);
mutex_destroy(subscription->lock);
uuid_destroy(subscription->correlation_id);
thread_destroy(subscription->worker_thread);
queue_destroy(subscription->events);
event_destroy(subscription->stopped);
free(subscription);
}
void esc_subscription_handle_tcp_package(esc_subscription_t* subscription, tcp_package_t* recv_pkg) {
mutex_lock(subscription->lock);
switch (recv_pkg->command) {
case MESSAGE_SUBSCRIPTIONCONFIRMATION: {
EventStore__Client__Messages__SubscriptionConfirmation* recv_msg =
event_store__client__messages__subscription_confirmation__unpack(&protobuf_allocator, buffer_size(recv_pkg->data), buffer_data(recv_pkg->data));
subscription->state = ESC_SUBSCRIBED;
subscription->last_event_number = recv_msg->last_event_number;
subscription->last_commit_position = recv_msg->last_commit_position;
event_store__client__messages__subscription_confirmation__free_unpacked(recv_msg, &protobuf_allocator);
thread_start(subscription->worker_thread);
break;
}
case MESSAGE_STREAMEVENTAPPEARED: {
esc_resolved_event_t* ev = esc_stream_event_appeared_unpack(recv_pkg->data);
queue_enqueue(subscription->events, ev);
break;
}
default: {
subscription->error = error_create(recv_pkg->command, "%s", get_string_for_tcp_message(recv_pkg->command));
subscription->state = ESC_DROPPED;
break;
}
/*
case MESSAGE_SUBSCRIPTIONDROPPED: {
//TODO
subscription->state = ESC_DROPPED;
break;
}
case MESSAGE_NOTAUTHENTICATED: {
//TODO
subscription->state = ESC_DROPPED;
break;
}
case MESSAGE_BADREQUEST: {
//TODO
subscription->state = ESC_DROPPED;
break;
}
case MESSAGE_NOTHANDLED: {
//TODO
subscription->state = ESC_DROPPED;
break;
}*/
}
tcp_package_destroy(recv_pkg);
mutex_unlock(subscription->lock);
}
bool_t esc_find_subscription_by_correlation_id(void* _subscription, void* _correlation_id) {
esc_subscription_t* subscription = _subscription;
uuid_t* correlation_id = _correlation_id;
return (uuid_compare(subscription->correlation_id, correlation_id) == 0) ? BOOL_TRUE : BOOL_FALSE;
}
inline esc_subscription_state_t esc_subscription_state(esc_subscription_t* subscription) {
return subscription->state;
}
void esc_subscription_stop(esc_subscription_t* subscription) {
mutex_lock(subscription->lock);
//TODO drop subscription message instead of direct drop
//TODO reason
subscription->state = ESC_DROPPED;
mutex_unlock(subscription->lock);
event_wait(subscription->stopped);
}

45
src/subscription.h Normal file
View File

@ -0,0 +1,45 @@
//
// Created by nicol on 2018-03-25.
//
#ifndef EVENSTORE_CLIENT_C_SUBSCRIPTION_H
#define EVENSTORE_CLIENT_C_SUBSCRIPTION_H
#include "utils/bool.h"
#include "utils/error.h"
#include "utils/uuid.h"
#include "tcp_package.h"
typedef struct st_esc_connection esc_connection_t;
typedef struct st_esc_resolved_event esc_resolved_event_t;
typedef struct st_esc_credentials esc_credentials_t;
typedef struct st_esc_subscription esc_subscription_t;
typedef void (*esc_event_appeared_t)(void* ctx, esc_subscription_t* s, esc_resolved_event_t* ev);
typedef struct st_event_appeared_callback {
esc_event_appeared_t fn;
void* ctx;
} esc_event_appeared_callback_t;
typedef void (*esc_subscription_droppped_t)(void* ctx, esc_subscription_t* subscription, const char* reason, error_t* error);
typedef struct st_subscription_dropped_callback {
esc_subscription_droppped_t fn;
void* ctx;
} esc_subscription_dropped_callback_t;
typedef enum {
ESC_SUBSCRIBING,
ESC_SUBSCRIBED,
ESC_DROPPED
} esc_subscription_state_t;
esc_subscription_t* esc_subscription_create(esc_connection_t* conn, uuid_t* correlation_id,
esc_event_appeared_callback_t event_appeared,
esc_subscription_dropped_callback_t subscription_dropped);
void esc_subscription_handle_tcp_package(esc_subscription_t* subscription, tcp_package_t* pkg);
bool_t esc_find_subscription_by_correlation_id(void* item, void* arg);
esc_subscription_state_t esc_subscription_state(esc_subscription_t* subscription);
void esc_subscription_stop(esc_subscription_t* subscription);
#endif //EVENSTORE_CLIENT_C_SUBSCRIPTION_H

View File

@ -5,6 +5,34 @@
#include "event.h" #include "event.h"
#include "debug.h" #include "debug.h"
#ifdef _WIN32
#include <windows.h>
struct st_event {
HANDLE handle;
};
event_t* event_create() {
event_t* event = malloc(sizeof(struct st_event));
event->handle = CreateEvent(NULL, FALSE, FALSE, NULL);
return event;
}
void event_destroy(event_t* event) {
CloseHandle(event->handle);
free(event);
}
void event_wait(event_t* event) {
WaitForSingleObject(event->handle, INFINITE);
}
void event_set(event_t* event) {
SetEvent(event->handle);
}
#endif
#ifdef __linux__ #ifdef __linux__
#include <pthread.h> #include <pthread.h>

View File

@ -32,6 +32,7 @@ void mutex_unlock(mutex_t* mutex) {
void mutex_destroy(mutex_t* mutex) { void mutex_destroy(mutex_t* mutex) {
DeleteCriticalSection(&mutex->handle); DeleteCriticalSection(&mutex->handle);
free(mutex);
} }
#endif #endif
#ifdef __linux__ #ifdef __linux__

View File

@ -118,3 +118,18 @@ void* queue_remove(queue_t* q, find_predicate predicate, void* arg) {
mutex_unlock(q->lock); mutex_unlock(q->lock);
return found; return found;
} }
void* queue_find(queue_t* q, find_predicate predicate, void* arg) {
void* found = 0;
mutex_lock(q->lock);
struct st_node* p = q->start;
while(p) {
if (predicate(p->item, arg) == BOOL_TRUE) {
found = p->item;
break;
}
p = p->next;
}
mutex_unlock(q->lock);
return found;
}

View File

@ -18,5 +18,6 @@ size_t queue_size(queue_t* q);
typedef bool_t (*find_predicate)(void* item, void* arg); typedef bool_t (*find_predicate)(void* item, void* arg);
void* queue_remove(queue_t* q, find_predicate, void* arg); void* queue_remove(queue_t* q, find_predicate, void* arg);
void* queue_find(queue_t* q, find_predicate, void* arg);
#endif //ESC_QUEUE_H #endif //ESC_QUEUE_H

View File

@ -32,6 +32,11 @@ socket_t* socket_create(int type) {
return NULL; return NULL;
} }
void socket_destroy(socket_t* socket) {
socket_close(socket);
free(socket);
}
int socket_close(socket_t* s) { int socket_close(socket_t* s) {
int rc = closesocket(s->s); int rc = closesocket(s->s);
if (rc != 0) { if (rc != 0) {

View File

@ -8,6 +8,10 @@
#include <stdint.h> //uint8_t #include <stdint.h> //uint8_t
#include <stddef.h> //size_t #include <stddef.h> //size_t
#if defined(_WIN32) && defined(UUID_DEFINED)
#undef uuid_t
#endif
typedef struct st_uuid uuid_t; typedef struct st_uuid uuid_t;
uuid_t* uuid_create(); uuid_t* uuid_create();

View File

@ -1,7 +1,3 @@
#include <stdio.h>
#include "../src/utils/debug.h"
#include "../include/esc.h"
#ifdef _WIN32 #ifdef _WIN32
#include <winsock.h> #include <winsock.h>
#include <windows.h> #include <windows.h>
@ -12,6 +8,14 @@
#include <unistd.h> #include <unistd.h>
#endif #endif
#include <stdio.h>
#include "../src/utils/debug.h"
#include "../include/esc.h"
void my_event_appeared(void *ctx, esc_subscription_t *s, esc_resolved_event_t *e) {
printf("Event Appeared: ctx=%p s=%p e=%s\n", ctx, s, e->event ? e->event->event_type : e->link ? e->link->event_type : "<unresolved>");
}
int main() { int main() {
#ifdef _WIN32 #ifdef _WIN32
WSADATA wsaData; WSADATA wsaData;
@ -61,7 +65,7 @@ int main() {
char uuid_buf[37]; char uuid_buf[37];
for (size_t i = 0; i < result->n_events; i++) { for (size_t i = 0; i < result->n_events; i++) {
#ifdef _WIN32 #ifdef _WIN32
printf("%s %s %lld@%s %llu %llu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), printf("%s %s %lld@%s %llu %llu\n", uuid_format(result->events[i]->event->event_id, uuid_buf, 37),
#else #else
printf("%s %s %ld@%s %lu %lu\n", uuid_format(result->events[i]->event->event_id, uuid_buf, 37), printf("%s %s %ld@%s %lu %lu\n", uuid_format(result->events[i]->event->event_id, uuid_buf, 37),
#endif #endif
@ -75,7 +79,12 @@ int main() {
if (result) esc_all_events_slice_destroy(result); if (result) esc_all_events_slice_destroy(result);
sleep(5); esc_event_appeared_callback_t my_event_appeared_callback = {my_event_appeared, 0};
esc_subscription_t* subscription = esc_connection_subscribe_to_all(conn, BOOL_TRUE, my_event_appeared_callback, esc_empty_subscription_dropped_callback, credentials);
sleep(60);
esc_subscription_destroy(subscription);
esc_connection_destroy(conn); esc_connection_destroy(conn);
esc_credentials_destroy(credentials); esc_credentials_destroy(credentials);