diff --git a/CMakeLists.txt b/CMakeLists.txt index f36bcee..a44901f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.9) -project(esc C) +project(evenstore-client-c C) set(CMAKE_C_STANDARD 99) @@ -10,11 +10,12 @@ if(WIN32) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) endif() -add_library(eventstore-client-c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h include/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c src/position.c src/position.h src/credentials.c src/credentials.h src/connection.c src/connection.h src/utils/bool.h src/endpoint_discoverer.h src/utils/error.c src/utils/error.h src/endpoint_discoverer.c src/proto_helper.h src/proto_helper.c src/results.h src/results.c src/event_data.c src/event_data.h src/utils/array.c src/utils/array.h src/utils/event.c src/utils/event.h) +add_library(eventstore-client-c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h include/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h src/utils/string.c src/utils/string.h src/utils/queue.c src/utils/queue.h src/tcp_messages.c src/tcp_messages.h src/utils/debug.h src/utils/debug.c src/position.c src/position.h src/credentials.c src/credentials.h src/connection.c src/connection.h src/utils/bool.h src/endpoint_discoverer.h src/utils/error.c src/utils/error.h src/endpoint_discoverer.c src/proto_helper.h src/proto_helper.c src/results.h src/results.c src/event_data.c src/event_data.h src/utils/array.c src/utils/array.h src/utils/event.c src/utils/event.h src/subscription.c src/subscription.h) add_executable(esc test/main.c include/esc.h) if(WIN32) - target_link_libraries(esc wsock32 ws2_32 eventstore-client-c.a libprotobuf-c.a) + target_link_libraries(eventstore-client-c wsock32 ws2_32) + target_link_libraries(esc eventstore-client-c libprotobuf-c.a) else() target_link_libraries(eventstore-client-c pthread) target_link_libraries(esc pthread eventstore-client-c protobuf-c) diff --git a/include/esc.h b/include/esc.h index df8db32..e2a66f0 100644 --- a/include/esc.h +++ b/include/esc.h @@ -19,6 +19,9 @@ array_t* array_create(size_t n, ...); void array_destroy(array_t* array, array_deallocator destroyer); // uuid +#if defined(_WIN32) && defined(UUID_DEFINED) +#undef uuid_t +#endif typedef struct st_uuid uuid_t; uuid_t* uuid_create(); void uuid_destroy(uuid_t*); @@ -45,10 +48,10 @@ typedef struct st_error { typedef struct st_esc_position esc_position_t; typedef struct st_esc_connection_settings esc_connection_settings_t; -typedef struct st_connection esc_connection_t; -typedef struct st_credentials esc_credentials_t; +typedef struct st_esc_connection esc_connection_t; +typedef struct st_esc_credentials esc_credentials_t; -struct st_recorded_event { +typedef struct st_esc_recorded_event { uuid_t* event_id; const char* event_type; int64_t event_number; @@ -56,15 +59,13 @@ struct st_recorded_event { int64_t created_epoch; buffer_t* data; buffer_t* metadata; -}; -typedef struct st_recorded_event esc_recorded_event_t; +} esc_recorded_event_t; -struct st_resolved_event { +typedef struct st_esc_resolved_event { esc_recorded_event_t* event; esc_recorded_event_t* link; esc_position_t* original_position; -}; -typedef struct st_resolved_event esc_resolved_event_t; +} esc_resolved_event_t; struct st_all_events_slice { char* read_direction; @@ -83,6 +84,20 @@ typedef struct st_esc_write_result { typedef struct st_esc_event_data esc_event_data_t; +typedef struct st_esc_subscription esc_subscription_t; + +typedef void (*esc_event_appeared_t)(void* ctx, esc_subscription_t* s, esc_resolved_event_t* ev); +typedef struct st_event_appeared_callback { + esc_event_appeared_t fn; + void* ctx; +} esc_event_appeared_callback_t; + +typedef void (*esc_subscription_droppped_t)(void* ctx, esc_subscription_t* subscription, const char* reason, error_t* error); +typedef struct st_subscription_dropped_callback { + esc_subscription_droppped_t fn; + void* ctx; +} esc_subscription_dropped_callback_t; + esc_connection_settings_t* const esc_default_connection_settings; // Connection @@ -105,6 +120,14 @@ void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice); error_t* esc_connection_last_error(esc_connection_t* conn); +// Subscriptions +extern esc_event_appeared_callback_t esc_empty_event_appeared_callback; +extern esc_subscription_dropped_callback_t esc_empty_subscription_dropped_callback; + +esc_subscription_t* esc_connection_subscribe_to_all(esc_connection_t* conn, bool_t resolve_link_tos, esc_event_appeared_callback_t event_appeared_callback, esc_subscription_dropped_callback_t subscription_dropped_callback, esc_credentials_t* credentials); +void esc_subscription_stop(esc_subscription_t* subscription); +void esc_subscription_destroy(esc_subscription_t* subscription); + // Formatting const char* uuid_format(uuid_t* uuid, char* buf, size_t buf_size); const char* esc_position_format(esc_position_t* position, char* buffer, size_t buf_size); diff --git a/src/connection.c b/src/connection.c index 6bf5afb..9d19c77 100644 --- a/src/connection.c +++ b/src/connection.c @@ -2,20 +2,6 @@ // Created by nicolas on 22/03/18. // -#include -#include -#include -#include "connection.h" -#include "tcp_package.h" -#include "tcp_messages.h" -#include "credentials.h" -#include "proto_helper.h" -#include "utils/array.h" -#include "utils/debug.h" -#include "utils/error.h" -#include "utils/string.h" -#include "utils/socket.h" - // usleep #ifdef _WIN32 #include @@ -25,6 +11,22 @@ #include #endif +#include +#include +#include +#include "connection.h" +#include "subscription.h" +#include "tcp_package.h" +#include "tcp_messages.h" +#include "credentials.h" +#include "proto_helper.h" +#include "utils/array.h" +#include "utils/debug.h" +#include "utils/error.h" +#include "utils/string.h" +#include "utils/socket.h" +#include "proto.h" + struct st_esc_connection_settings default_connection_settings = { BOOL_FALSE }; @@ -35,7 +37,7 @@ esc_connection_settings_t* esc_default_connection_settings = &default_connection ssize_t connection_send_tcp_package(esc_connection_t* conn, tcp_package_t* pkg) { char uuid_buf[37]; #ifdef _WIN32 - fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, esc_uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); + fprintf(stderr, "connection_send_tcp_package: %s %u %s %llu ", get_string_for_tcp_message(pkg->command), pkg->flags, uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); #else fprintf(stderr, "connection_send_tcp_package: %s %u %s %lu ", get_string_for_tcp_message(pkg->command), pkg->flags, uuid_format(pkg->correlation_id, uuid_buf, 37), buffer_size(pkg->data)); @@ -87,7 +89,7 @@ tcp_package_t* connection_recv_tcp_package(esc_connection_t* conn) { buffer_destroy(recv_buffer); char uuid_buf[37]; #ifdef _WIN32 - fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, esc_uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); + fprintf(stderr, "connection_recv_tcp_package: %s %u %s %llu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); #else fprintf(stderr, "connection_recv_tcp_package: %s %u %s %lu\n", get_string_for_tcp_message(recv_pkg->command), recv_pkg->flags, uuid_format(recv_pkg->correlation_id, uuid_buf, 37), buffer_size(recv_pkg->data)); @@ -111,6 +113,7 @@ void* connection_thread(void* arg) { } if (socket_readable(conn->tcp_conn)) { + esc_subscription_t* subscription = 0; tcp_package_t* recv_pkg = connection_recv_tcp_package(conn); if (recv_pkg == 0) { //TODO Handle connection lost @@ -119,6 +122,8 @@ void* connection_thread(void* arg) { tcp_package_t* heartbeat_pkg = tcp_package_create(MESSAGE_HEARTBEATRESPONSE, recv_pkg->correlation_id, buffer_create(0)); queue_enqueue(conn->send_queue, heartbeat_pkg); tcp_package_destroy(recv_pkg); + } else if ((subscription = queue_find(conn->subscriptions, esc_find_subscription_by_correlation_id, recv_pkg->correlation_id)) != 0) { + esc_subscription_handle_tcp_package(subscription, recv_pkg); } else { queue_enqueue(conn->recv_queue, recv_pkg); } @@ -151,7 +156,7 @@ tcp_package_t* connection_wait_for(esc_connection_t* conn, uuid_t* correlation_i } esc_connection_t* esc_connection_create(esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name) { - struct st_connection* conn = malloc(sizeof(struct st_connection)); + struct st_esc_connection* conn = malloc(sizeof(struct st_esc_connection)); conn->settings = connection_settings; if (connection_name == 0 || strcmp(connection_name, "") == 0) { uuid_t* uuid = uuid_create(); @@ -167,6 +172,7 @@ esc_connection_t* esc_connection_create(esc_connection_settings_t* connection_se conn->stop = 0; conn->stopped_event = event_create(); conn->last_error = 0; + conn->subscriptions = queue_create(); if (strncmp(addr, "tcp://", 6) != 0) { conn->last_error = error_create(1, "invalid schema for address: %s", addr); @@ -203,6 +209,7 @@ void esc_connection_destroy(esc_connection_t* conn) { queue_destroy(conn->send_queue); event_destroy(conn->stopped_event); free((void*)conn->name); + queue_destroy(conn->subscriptions); free(conn); } @@ -310,6 +317,26 @@ esc_write_result_t* esc_append_to_stream(esc_connection_t* conn, const char* str return result; } +esc_subscription_t* esc_connection_subscribe_to_all(esc_connection_t* conn, bool_t resolve_link_tos, esc_event_appeared_callback_t event_appeared_callback, esc_subscription_dropped_callback_t subscription_dropped_callback, esc_credentials_t* credentials) { + uuid_t* correlation_id = uuid_create(); + + esc_subscription_t* subscription = esc_subscription_create(conn, correlation_id, event_appeared_callback, subscription_dropped_callback); + queue_enqueue(conn->subscriptions, subscription); + + buffer_t* msg_buf = esc_subscribe_to_stream_pack("", resolve_link_tos); + tcp_package_t* send_pkg = tcp_package_create_authenticated(MESSAGE_SUBSCRIBETOSTREAM, correlation_id, msg_buf, credentials->username, credentials->password); + queue_enqueue(conn->send_queue, send_pkg); + + bool_t subscribing = BOOL_TRUE; + while(subscribing) { + subscribing = esc_subscription_state(subscription) == ESC_SUBSCRIBING; + usleep(1); + } + + uuid_destroy(correlation_id); + return subscription; +} + void esc_connection_close(esc_connection_t* conn) { if (conn->stop == 1) return; conn->stop = 1; diff --git a/src/connection.h b/src/connection.h index abaa76e..48f8d4e 100644 --- a/src/connection.h +++ b/src/connection.h @@ -6,13 +6,14 @@ #define ESC_ESC_CONNECTION_H #include +#include "endpoint_discoverer.h" +#include "tcp_package.h" #include "utils/bool.h" #include "utils/socket.h" #include "utils/thread.h" #include "utils/mutex.h" #include "utils/queue.h" #include "utils/error.h" -#include "endpoint_discoverer.h" #include "utils/event.h" typedef struct st_esc_connection_settings { @@ -21,7 +22,7 @@ typedef struct st_esc_connection_settings { esc_connection_settings_t* esc_default_connection_settings; -typedef struct st_connection { +typedef struct st_esc_connection { esc_connection_settings_t* settings; const char* name; void* discoverer_data; @@ -33,8 +34,10 @@ typedef struct st_connection { queue_t* recv_queue; error_t* last_error; event_t* stopped_event; + queue_t* subscriptions; } esc_connection_t; +tcp_package_t* connection_wait_for(esc_connection_t* conn, uuid_t* correlation_id); error_t* esc_connection_last_error(esc_connection_t* conn); void esc_connection_close(esc_connection_t* conn); diff --git a/src/credentials.c b/src/credentials.c index 3792fcf..35181c8 100644 --- a/src/credentials.c +++ b/src/credentials.c @@ -8,7 +8,7 @@ #include "credentials.h" esc_credentials_t* esc_credentials_create(const char* username, const char* password) { - esc_credentials_t* creds = malloc(sizeof(struct st_credentials)); + esc_credentials_t* creds = malloc(sizeof(struct st_esc_credentials)); creds->username = string_copy(username); creds->password = string_copy(password); return creds; diff --git a/src/credentials.h b/src/credentials.h index e20b5d3..1027f2e 100644 --- a/src/credentials.h +++ b/src/credentials.h @@ -5,11 +5,11 @@ #ifndef ESC_ESC_CREDENTIALS_H #define ESC_ESC_CREDENTIALS_H -struct st_credentials { +struct st_esc_credentials { const char* username; const char* password; }; -typedef struct st_credentials esc_credentials_t; +typedef struct st_esc_credentials esc_credentials_t; esc_credentials_t* esc_credentials_create(const char* username, const char* password); void esc_credentials_destroy(esc_credentials_t* creds); diff --git a/src/proto_helper.c b/src/proto_helper.c index 536533a..0169be9 100644 --- a/src/proto_helper.c +++ b/src/proto_helper.c @@ -2,12 +2,10 @@ // Created by nicolas on 22/03/18. // -#include "utils/debug.h" -#include "utils/string.h" -#include "results.h" #include "proto_helper.h" #include "proto.h" -#include "utils/bool.h" +#include "utils/debug.h" +#include "utils/string.h" #include "event_data.h" void* protobuf_c_alloc(void *alloc_data, size_t size) { @@ -48,7 +46,7 @@ buffer_t* esc_identify_client_pack(const char* connection_name) { esc_recorded_event_t* esc_recorded_event_unpack(EventStore__Client__Messages__EventRecord* msg) { if (msg == 0) return 0; - esc_recorded_event_t* ev = malloc(sizeof(struct st_recorded_event)); + esc_recorded_event_t* ev = malloc(sizeof(struct st_esc_recorded_event)); ev->event_id = uuid_from(msg->event_id.data, msg->event_id.len); ev->event_type = string_copy(msg->event_type); ev->event_number = msg->event_number; @@ -60,13 +58,21 @@ esc_recorded_event_t* esc_recorded_event_unpack(EventStore__Client__Messages__Ev } esc_resolved_event_t* esc_resolved_event_unpack(EventStore__Client__Messages__ResolvedEvent* msg) { - esc_resolved_event_t* ev = malloc(sizeof(struct st_resolved_event)); + esc_resolved_event_t* ev = malloc(sizeof(struct st_esc_resolved_event)); ev->original_position = esc_position_create(msg->prepare_position, msg->commit_position); ev->event = esc_recorded_event_unpack(msg->event); ev->link = esc_recorded_event_unpack(msg->link); return ev; } +esc_resolved_event_t* esc_stream_event_appeared_unpack(buffer_t* data) { + EventStore__Client__Messages__StreamEventAppeared* recv_msg = + event_store__client__messages__stream_event_appeared__unpack(&protobuf_allocator, buffer_size(data), buffer_data(data)); + esc_resolved_event_t* ev = esc_resolved_event_unpack(recv_msg->event); + event_store__client__messages__stream_event_appeared__free_unpacked(recv_msg, &protobuf_allocator); + return ev; +} + const char* get_string_for_all_events_result(int result) { switch(result) { case EVENT_STORE__CLIENT__MESSAGES__READ_ALL_EVENTS_COMPLETED__READ_ALL_RESULT__Success: return "Success"; @@ -210,3 +216,14 @@ buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version free(send_msg.events); return buffer_copyfrom(buffer, buf_size); } + +buffer_t* esc_subscribe_to_stream_pack(const char* stream, bool_t resolve_link_tos) { + EventStore__Client__Messages__SubscribeToStream send_msg; + event_store__client__messages__subscribe_to_stream__init(&send_msg); + send_msg.event_stream_id = (char*)stream; + send_msg.resolve_link_tos = resolve_link_tos; + size_t buf_size = event_store__client__messages__subscribe_to_stream__get_packed_size(&send_msg); + uint8_t buffer[buf_size]; + event_store__client__messages__subscribe_to_stream__pack(&send_msg, buffer); + return buffer_copyfrom(buffer, buf_size); +} diff --git a/src/proto_helper.h b/src/proto_helper.h index 00a8abc..18af8e7 100644 --- a/src/proto_helper.h +++ b/src/proto_helper.h @@ -5,12 +5,15 @@ #ifndef ESC_PROTO_HELPER_H #define ESC_PROTO_HELPER_H +#include #include "utils/array.h" #include "utils/buffer.h" #include "utils/bool.h" #include "utils/error.h" #include "results.h" +extern ProtobufCAllocator protobuf_allocator; + typedef enum { Operation_Decision_DoNothing, Operation_Decision_EndOperation, @@ -28,6 +31,8 @@ void inspection_result_destroy(inspection_result_t*); buffer_t* esc_identify_client_pack(const char* connection_name); +esc_resolved_event_t* esc_stream_event_appeared_unpack(buffer_t* data); + inspection_result_t* esc_all_events_slice_unpack(buffer_t* buffer, esc_all_events_slice_t** all_events_slice_p); buffer_t* esc_read_all_forward_pack(esc_position_t* last_checkpoint, int32_t count, bool_t resolve_link_tos); const char* get_string_for_all_events_result(int result); @@ -36,4 +41,6 @@ inspection_result_t* esc_write_result_unpack(buffer_t* buffer, esc_write_result_ buffer_t* esc_append_to_stream_pack(const char* stream, int64_t expected_version, array_t* events); const char* get_string_for_write_result(int result); +buffer_t* esc_subscribe_to_stream_pack(const char* stream, bool_t resolve_link_tos); + #endif //ESC_PROTO_HELPER_H diff --git a/src/results.h b/src/results.h index 8bc2b33..a1b1fc4 100644 --- a/src/results.h +++ b/src/results.h @@ -9,7 +9,7 @@ #include "utils/buffer.h" #include "position.h" -struct st_recorded_event { +typedef struct st_esc_recorded_event { uuid_t* event_id; const char* event_type; int64_t event_number; @@ -17,15 +17,13 @@ struct st_recorded_event { int64_t created_epoch; buffer_t* data; buffer_t* metadata; -}; -typedef struct st_recorded_event esc_recorded_event_t; +} esc_recorded_event_t; -struct st_resolved_event { +typedef struct st_esc_resolved_event { esc_recorded_event_t* event; esc_recorded_event_t* link; esc_position_t* original_position; -}; -typedef struct st_resolved_event esc_resolved_event_t; +} esc_resolved_event_t; struct st_all_events_slice { char* read_direction; @@ -42,9 +40,14 @@ typedef struct st_esc_write_result_t { esc_position_t* log_position; } esc_write_result_t; +typedef struct st_esc_subscription_confirmation { + +} esc_subscription_confirmation_t; + void esc_recorded_event_destroy(esc_recorded_event_t* recorded_event); void esc_resolved_event_destroy(esc_resolved_event_t* resolved_event); void esc_all_events_slice_destroy(esc_all_events_slice_t* all_events_slice); void esc_write_result_destroy(esc_write_result_t* write_result); +void esc_subscription_confirmation_destroy(esc_subscription_confirmation_t* subscription_confirmation); #endif //ESC_RESULTS_H diff --git a/src/subscription.c b/src/subscription.c new file mode 100644 index 0000000..a8bde6f --- /dev/null +++ b/src/subscription.c @@ -0,0 +1,170 @@ +// +// Created by nicol on 2018-03-25. +// + +#ifdef _WIN32 +#include +#include + +#define usleep Sleep +#endif + +#include "subscription.h" +#include "connection.h" +#include "credentials.h" +#include "proto_helper.h" +#include "tcp_package.h" +#include "tcp_messages.h" +#include "utils/uuid.h" +#include "utils/buffer.h" +#include "utils/debug.h" +#include "proto.h" + +esc_event_appeared_callback_t esc_empty_event_appeared_callback = {0,0}; +esc_subscription_dropped_callback_t esc_empty_subscription_dropped_callback = {0,0}; + +struct st_esc_subscription { + mutex_t* lock; + esc_subscription_state_t state; + esc_connection_t* conn; + uuid_t* correlation_id; + esc_event_appeared_callback_t event_appeared; + esc_subscription_dropped_callback_t subscription_dropped; + int64_t last_commit_position; + int64_t last_event_number; + error_t* error; + thread_t* worker_thread; + queue_t* events; + event_t* stopped; +}; + +void* esc_subscription_worker_thread(void* arg) { + esc_subscription_t* subscription = arg; + + bool_t subscribed = BOOL_TRUE; + while(subscribed) { + esc_resolved_event_t* ev = queue_dequeue(subscription->events); + //TODO full logic + if (ev && subscription->event_appeared.fn) { + subscription->event_appeared.fn(subscription->event_appeared.ctx, subscription, ev); + } + if (ev) esc_resolved_event_destroy(ev); + + mutex_lock(subscription->lock); + subscribed = subscription->state == ESC_SUBSCRIBED; + mutex_unlock(subscription->lock); + + usleep(1); + } + + //TODO reason + if (subscription->subscription_dropped.fn) { + subscription->subscription_dropped.fn(subscription->subscription_dropped.ctx, subscription, "", subscription->error); + } + + //TODO uuid instead of ptr + printf("Subscription %p worker thread stopped\n", subscription); + + event_set(subscription->stopped); +} + +esc_subscription_t* esc_subscription_create(esc_connection_t* conn, uuid_t* correlation_id, + esc_event_appeared_callback_t event_appeared, + esc_subscription_dropped_callback_t subscription_dropped +) { + esc_subscription_t* subscription = malloc(sizeof(esc_subscription_t)); + subscription->lock = mutex_create(); + subscription->state = ESC_SUBSCRIBING; + subscription->conn = conn; + subscription->correlation_id = uuid_copy(correlation_id); + subscription->event_appeared = event_appeared; + subscription->subscription_dropped = subscription_dropped; + subscription->error = 0; + subscription->worker_thread = thread_create(esc_subscription_worker_thread, subscription); + subscription->events = queue_create(); + subscription->stopped = event_create(); + return subscription; +} + +void esc_subscription_destroy(esc_subscription_t* subscription) { + esc_subscription_stop(subscription); + mutex_destroy(subscription->lock); + uuid_destroy(subscription->correlation_id); + thread_destroy(subscription->worker_thread); + queue_destroy(subscription->events); + event_destroy(subscription->stopped); + free(subscription); +} + +void esc_subscription_handle_tcp_package(esc_subscription_t* subscription, tcp_package_t* recv_pkg) { + mutex_lock(subscription->lock); + + switch (recv_pkg->command) { + case MESSAGE_SUBSCRIPTIONCONFIRMATION: { + EventStore__Client__Messages__SubscriptionConfirmation* recv_msg = + event_store__client__messages__subscription_confirmation__unpack(&protobuf_allocator, buffer_size(recv_pkg->data), buffer_data(recv_pkg->data)); + subscription->state = ESC_SUBSCRIBED; + subscription->last_event_number = recv_msg->last_event_number; + subscription->last_commit_position = recv_msg->last_commit_position; + event_store__client__messages__subscription_confirmation__free_unpacked(recv_msg, &protobuf_allocator); + thread_start(subscription->worker_thread); + break; + } + case MESSAGE_STREAMEVENTAPPEARED: { + esc_resolved_event_t* ev = esc_stream_event_appeared_unpack(recv_pkg->data); + queue_enqueue(subscription->events, ev); + break; + } + default: { + subscription->error = error_create(recv_pkg->command, "%s", get_string_for_tcp_message(recv_pkg->command)); + subscription->state = ESC_DROPPED; + break; + } + /* + case MESSAGE_SUBSCRIPTIONDROPPED: { + //TODO + subscription->state = ESC_DROPPED; + break; + } + case MESSAGE_NOTAUTHENTICATED: { + //TODO + subscription->state = ESC_DROPPED; + break; + } + case MESSAGE_BADREQUEST: { + //TODO + subscription->state = ESC_DROPPED; + break; + } + case MESSAGE_NOTHANDLED: { + //TODO + subscription->state = ESC_DROPPED; + break; + }*/ + } + + tcp_package_destroy(recv_pkg); + + mutex_unlock(subscription->lock); +} + +bool_t esc_find_subscription_by_correlation_id(void* _subscription, void* _correlation_id) { + esc_subscription_t* subscription = _subscription; + uuid_t* correlation_id = _correlation_id; + + return (uuid_compare(subscription->correlation_id, correlation_id) == 0) ? BOOL_TRUE : BOOL_FALSE; +} + +inline esc_subscription_state_t esc_subscription_state(esc_subscription_t* subscription) { + return subscription->state; +} + +void esc_subscription_stop(esc_subscription_t* subscription) { + mutex_lock(subscription->lock); + //TODO drop subscription message instead of direct drop + //TODO reason + subscription->state = ESC_DROPPED; + mutex_unlock(subscription->lock); + + event_wait(subscription->stopped); +} diff --git a/src/subscription.h b/src/subscription.h new file mode 100644 index 0000000..1ca4b47 --- /dev/null +++ b/src/subscription.h @@ -0,0 +1,45 @@ +// +// Created by nicol on 2018-03-25. +// + +#ifndef EVENSTORE_CLIENT_C_SUBSCRIPTION_H +#define EVENSTORE_CLIENT_C_SUBSCRIPTION_H + +#include "utils/bool.h" +#include "utils/error.h" +#include "utils/uuid.h" +#include "tcp_package.h" + +typedef struct st_esc_connection esc_connection_t; +typedef struct st_esc_resolved_event esc_resolved_event_t; +typedef struct st_esc_credentials esc_credentials_t; + +typedef struct st_esc_subscription esc_subscription_t; + +typedef void (*esc_event_appeared_t)(void* ctx, esc_subscription_t* s, esc_resolved_event_t* ev); +typedef struct st_event_appeared_callback { + esc_event_appeared_t fn; + void* ctx; +} esc_event_appeared_callback_t; + +typedef void (*esc_subscription_droppped_t)(void* ctx, esc_subscription_t* subscription, const char* reason, error_t* error); +typedef struct st_subscription_dropped_callback { + esc_subscription_droppped_t fn; + void* ctx; +} esc_subscription_dropped_callback_t; + +typedef enum { + ESC_SUBSCRIBING, + ESC_SUBSCRIBED, + ESC_DROPPED +} esc_subscription_state_t; + +esc_subscription_t* esc_subscription_create(esc_connection_t* conn, uuid_t* correlation_id, + esc_event_appeared_callback_t event_appeared, + esc_subscription_dropped_callback_t subscription_dropped); +void esc_subscription_handle_tcp_package(esc_subscription_t* subscription, tcp_package_t* pkg); +bool_t esc_find_subscription_by_correlation_id(void* item, void* arg); +esc_subscription_state_t esc_subscription_state(esc_subscription_t* subscription); +void esc_subscription_stop(esc_subscription_t* subscription); + +#endif //EVENSTORE_CLIENT_C_SUBSCRIPTION_H diff --git a/src/utils/event.c b/src/utils/event.c index 333eeba..dcbe9a2 100644 --- a/src/utils/event.c +++ b/src/utils/event.c @@ -5,6 +5,34 @@ #include "event.h" #include "debug.h" +#ifdef _WIN32 +#include + +struct st_event { + HANDLE handle; +}; + +event_t* event_create() { + event_t* event = malloc(sizeof(struct st_event)); + event->handle = CreateEvent(NULL, FALSE, FALSE, NULL); + return event; +} + +void event_destroy(event_t* event) { + CloseHandle(event->handle); + free(event); +} + +void event_wait(event_t* event) { + WaitForSingleObject(event->handle, INFINITE); +} + +void event_set(event_t* event) { + SetEvent(event->handle); +} + +#endif + #ifdef __linux__ #include diff --git a/src/utils/mutex.c b/src/utils/mutex.c index a15ffb0..b9589ce 100644 --- a/src/utils/mutex.c +++ b/src/utils/mutex.c @@ -32,6 +32,7 @@ void mutex_unlock(mutex_t* mutex) { void mutex_destroy(mutex_t* mutex) { DeleteCriticalSection(&mutex->handle); + free(mutex); } #endif #ifdef __linux__ diff --git a/src/utils/queue.c b/src/utils/queue.c index e2ecafa..39d7bac 100644 --- a/src/utils/queue.c +++ b/src/utils/queue.c @@ -117,4 +117,19 @@ void* queue_remove(queue_t* q, find_predicate predicate, void* arg) { } mutex_unlock(q->lock); return found; -} \ No newline at end of file +} + +void* queue_find(queue_t* q, find_predicate predicate, void* arg) { + void* found = 0; + mutex_lock(q->lock); + struct st_node* p = q->start; + while(p) { + if (predicate(p->item, arg) == BOOL_TRUE) { + found = p->item; + break; + } + p = p->next; + } + mutex_unlock(q->lock); + return found; +} diff --git a/src/utils/queue.h b/src/utils/queue.h index b525ef3..c2bef30 100644 --- a/src/utils/queue.h +++ b/src/utils/queue.h @@ -18,5 +18,6 @@ size_t queue_size(queue_t* q); typedef bool_t (*find_predicate)(void* item, void* arg); void* queue_remove(queue_t* q, find_predicate, void* arg); +void* queue_find(queue_t* q, find_predicate, void* arg); #endif //ESC_QUEUE_H diff --git a/src/utils/socket.c b/src/utils/socket.c index 9526321..e442b1e 100644 --- a/src/utils/socket.c +++ b/src/utils/socket.c @@ -32,6 +32,11 @@ socket_t* socket_create(int type) { return NULL; } +void socket_destroy(socket_t* socket) { + socket_close(socket); + free(socket); +} + int socket_close(socket_t* s) { int rc = closesocket(s->s); if (rc != 0) { diff --git a/src/utils/uuid.h b/src/utils/uuid.h index be10b60..949c912 100644 --- a/src/utils/uuid.h +++ b/src/utils/uuid.h @@ -8,6 +8,10 @@ #include //uint8_t #include //size_t +#if defined(_WIN32) && defined(UUID_DEFINED) +#undef uuid_t +#endif + typedef struct st_uuid uuid_t; uuid_t* uuid_create(); diff --git a/test/main.c b/test/main.c index 108cce4..b9dba81 100644 --- a/test/main.c +++ b/test/main.c @@ -1,7 +1,3 @@ -#include -#include "../src/utils/debug.h" -#include "../include/esc.h" - #ifdef _WIN32 #include #include @@ -12,6 +8,14 @@ #include #endif +#include +#include "../src/utils/debug.h" +#include "../include/esc.h" + +void my_event_appeared(void *ctx, esc_subscription_t *s, esc_resolved_event_t *e) { + printf("Event Appeared: ctx=%p s=%p e=%s\n", ctx, s, e->event ? e->event->event_type : e->link ? e->link->event_type : ""); +} + int main() { #ifdef _WIN32 WSADATA wsaData; @@ -61,7 +65,7 @@ int main() { char uuid_buf[37]; for (size_t i = 0; i < result->n_events; i++) { #ifdef _WIN32 - printf("%s %s %lld@%s %llu %llu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), + printf("%s %s %lld@%s %llu %llu\n", uuid_format(result->events[i]->event->event_id, uuid_buf, 37), #else printf("%s %s %ld@%s %lu %lu\n", uuid_format(result->events[i]->event->event_id, uuid_buf, 37), #endif @@ -74,8 +78,13 @@ int main() { } while(result->is_end_of_stream == 0); if (result) esc_all_events_slice_destroy(result); + + esc_event_appeared_callback_t my_event_appeared_callback = {my_event_appeared, 0}; + esc_subscription_t* subscription = esc_connection_subscribe_to_all(conn, BOOL_TRUE, my_event_appeared_callback, esc_empty_subscription_dropped_callback, credentials); - sleep(5); + sleep(60); + + esc_subscription_destroy(subscription); esc_connection_destroy(conn); esc_credentials_destroy(credentials);