diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 25bef7a..9dcbc20 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_C_STANDARD 99)
include_directories(c:/Users/nicol/dev/thirdparty/protobuf-c)
link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs)
-add_executable(esc main.c mutex.c mutex.h thread.c thread.h socket.c socket.h esc.c esc.h ClientMessageDtos.pb-c.c ClientMessageDtos.pb-c.h uuid.c uuid.h)
+add_executable(esc main.c mutex.c mutex.h thread.c thread.h socket.c socket.h esc.c esc.h ClientMessageDtos.pb-c.c ClientMessageDtos.pb-c.h uuid.c uuid.h buffer.c buffer.h tcp_package.c tcp_package.h)
if(WIN32)
target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a)
diff --git a/buffer.c b/buffer.c
new file mode 100644
index 0000000..9fa02d0
--- /dev/null
+++ b/buffer.c
@@ -0,0 +1,20 @@
+//
+// Created by nicol on 2018-03-18.
+//
+
+#include
+#include "buffer.h"
+
+const buffer_t buffer_create(uint32_t size) {
+ buffer_t buf = {size, malloc(size)};
+ return buf;
+}
+
+const buffer_t buffer_from(uint8_t* data, uint32_t size) {
+ buffer_t buf = {size, data};
+ return buf;
+}
+
+void buffer_free(buffer_t buffer) {
+ free(buffer.data);
+}
diff --git a/buffer.h b/buffer.h
new file mode 100644
index 0000000..8efbfb2
--- /dev/null
+++ b/buffer.h
@@ -0,0 +1,20 @@
+//
+// Created by nicol on 2018-03-18.
+//
+
+#ifndef ESC_BUFFER_H
+#define ESC_BUFFER_H
+
+#include
+
+struct st_buffer {
+ uint32_t size;
+ uint8_t* data;
+};
+typedef struct st_buffer buffer_t;
+
+const buffer_t buffer_create(uint32_t size);
+const buffer_t buffer_from(uint8_t* data, uint32_t size);
+void buffer_free(buffer_t buffer);
+
+#endif //ESC_BUFFER_H
diff --git a/esc.c b/esc.c
index 9e2bb91..6e2ff47 100644
--- a/esc.c
+++ b/esc.c
@@ -8,112 +8,9 @@
#include
#include "esc.h"
#include "socket.h"
-#include "uuid.h"
+#include "buffer.h"
#include "ClientMessageDtos.pb-c.h"
-
-struct st_buffer {
- uint32_t size;
- uint8_t* data;
-};
-typedef struct st_buffer buffer_t;
-
-const buffer_t buffer_create(uint32_t size) {
- buffer_t buf = {size, malloc(size)};
- return buf;
-}
-
-const buffer_t buffer_from(uint8_t* data, uint32_t size) {
- buffer_t buf = {size, data};
- return buf;
-}
-
-void buffer_free(buffer_t buffer) {
- free(buffer.data);
-}
-
-struct st_tcp_package {
- uint8_t command;
- uint8_t flags;
- esc_uuid_t correlation_id;
- const char* login;
- const char* password;
- buffer_t data;
-};
-typedef struct st_tcp_package tcp_package_t;
-
-const uint32_t CommandOffset = 0;
-const uint32_t FlagsOffset = 1;
-const uint32_t CorrelationOffset = 2;
-const uint32_t AuthOffset = 18;
-const uint32_t MandatorySize = 18;
-
-const tcp_package_t* tcp_package_create(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data) {
- tcp_package_t* pkg = malloc(sizeof(tcp_package_t));
- pkg->command = command;
- pkg->flags = 0;
- pkg->correlation_id = *correlation_id;
- pkg->login = 0;
- pkg->password = 0;
- pkg->data = data;
- return pkg;
-}
-
-const tcp_package_t* tcp_package_create_authenticated(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data, const char* username, const char* password) {
- tcp_package_t* pkg = malloc(sizeof(tcp_package_t));
- pkg->command = command;
- pkg->flags = 1;
- pkg->correlation_id = *correlation_id;
- pkg->login = username;
- pkg->password = password;
- pkg->data = data;
- return pkg;
-}
-
-void tcp_package_free(const tcp_package_t* pkg) {
- buffer_free(pkg->data);
- free((void*)pkg);
-}
-
-buffer_t tcp_package_to_buffer(const tcp_package_t* pkg) {
- uint8_t* _dst = malloc(MandatorySize + pkg->data.size + (pkg->flags ? 257*2 : 0));
- _dst[CommandOffset] = pkg->command;
- _dst[FlagsOffset] = pkg->flags;
- memcpy(&_dst[CorrelationOffset], &pkg->correlation_id, sizeof(esc_uuid_t));
- size_t size = MandatorySize;
- if (pkg->flags) {
- size_t l_len = strlen(pkg->login);
- _dst[AuthOffset] = l_len;
- strcpy(&_dst[AuthOffset+1], pkg->login);
- size_t p_len = strlen(pkg->password);
- _dst[AuthOffset+1+l_len] = p_len;
- strcpy(&_dst[AuthOffset+2+l_len], pkg->password);
- memcpy(&_dst[AuthOffset+2+l_len+p_len], pkg->data.data, pkg->data.size);
- size += 2 + l_len + p_len + pkg->data.size;
- } else {
- memcpy(&_dst[AuthOffset], pkg->data.data, pkg->data.size);
- size += pkg->data.size;
- }
- return buffer_from(_dst, size);
-}
-
-const tcp_package_t* tcp_package_from_buffer(buffer_t buffer) {
- tcp_package_t* pkg = malloc(sizeof(tcp_package_t));
- pkg->command = buffer.data[CommandOffset];
- pkg->flags = buffer.data[FlagsOffset];
- memcpy(&pkg->correlation_id, &buffer.data[CorrelationOffset], sizeof(esc_uuid_t));
- if (pkg->flags) {
- size_t l_len = buffer.data[AuthOffset];
- size_t p_len = buffer.data[AuthOffset+1+l_len];
- pkg->data = buffer_create(buffer.size - MandatorySize - 2 - l_len - p_len);
- memcpy(pkg->data.data, &buffer.data[MandatorySize + 2 + l_len + p_len], buffer.size - MandatorySize - 2 - l_len - p_len);
- } else {
- pkg->login = 0;
- pkg->password = 0;
- pkg->data = buffer_create(buffer.size - MandatorySize);
- memcpy(pkg->data.data, &buffer.data[MandatorySize], buffer.size - MandatorySize);
- }
- return pkg;
-}
+#include "tcp_package.h"
struct st_connection_settings {
unsigned char use_ssl_connection;
@@ -283,10 +180,17 @@ const esc_credentials_t* esc_credentials_create(const char* username, const char
return creds;
}
+const char* string_clone(const char* src) {
+ char* dst = malloc(strlen(src)+1);
+ strcpy(dst, src);
+ return dst;
+}
+
const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) {
if (msg == 0) return 0;
esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t));
ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len);
+ ev->event_type = string_clone(msg->event_type);
return ev;
}
@@ -338,6 +242,7 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti
for (size_t i = 0; i < recv_msg->n_events; i++) {
result->events[i] = resolved_event_create(recv_msg->events[i]);
}
+ event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator);
return result;
}
diff --git a/esc.h b/esc.h
index 26ab8c6..0e8aa52 100644
--- a/esc.h
+++ b/esc.h
@@ -27,6 +27,7 @@ typedef struct st_esc_position esc_position_t;
struct st_recorded_event {
const esc_uuid_t* event_id;
+ const char* event_type;
};
typedef struct st_recorded_event esc_recorded_event_t;
diff --git a/main.c b/main.c
index 89b5993..4ff946f 100644
--- a/main.c
+++ b/main.c
@@ -1,29 +1,8 @@
#include
#include "mutex.h"
-#include "thread.h"
-#include "socket.h"
#include "esc.h"
-
-/*
-unsigned long my_first_thread(void* arg) {
- mutex_t mut = arg;
- mutex_lock(mut);
- printf("1\n");
- mutex_unlock(mut);
- return 0;
-}
-
-unsigned long my_second_thread(void* arg) {
- mutex_t mut = arg;
- mutex_lock(mut);
- printf("2\n");
- mutex_unlock(mut);
- return 0;
-}
-*/
-
int main() {
#ifdef _WIN32
WSADATA wsaData;
@@ -38,45 +17,26 @@ int main() {
return -2;
}
const esc_credentials_t* credentials = esc_credentials_create("admin", "changeit");
- const esc_all_events_slice_t* result = esc_connection_read_all_forward(conn, NULL, 1024, credentials);
- if (result == 0) {
- return -3;
- }
- char posbuf1[44];
- char posbuf2[44];
- printf("%s %s %s %u\n", result->read_direction,
- esc_position_format(&result->from_position, posbuf1, 44),
- esc_position_format(&result->next_position, posbuf2, 44),
- result->is_end_of_stream);
- char uuid_buf[37];
- for(size_t i = 0; i < result->n_events; i++) {
- printf("%s\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37));
- }
-/*
- socket_t s = socket_create(IPPROTO_TCP);
- socket_connect(s, "www.google.ca", 80);
- char* req = "GET / HTTP/1.1\r\nHost: www.google.ca\r\nConnection: close\r\n\r\n";
- socket_send(s, req, strlen(req));
- char buffer[4096];
- memset(buffer, 0, 4096);
- int rc;
- while ((rc = socket_recv(s, buffer, 4096)) > 0) {
- printf_s("%s", buffer);
- memset(buffer, 0, 4096);
- }
- socket_close(s);
+ const esc_all_events_slice_t *result = 0;
+ do {
+ result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 1024, credentials);
+ if (result == 0) {
+ return -3;
+ }
+ char posbuf1[44];
+ char posbuf2[44];
+ printf("%s %s %s %u\n", result->read_direction,
+ esc_position_format(&result->from_position, posbuf1, 44),
+ esc_position_format(&result->next_position, posbuf2, 44),
+ result->is_end_of_stream);
+ char uuid_buf[37];
+ for (size_t i = 0; i < result->n_events; i++) {
+ printf("%s %s\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37),
+ result->events[i]->event->event_type);
+ }
+ } while(result->is_end_of_stream == 0);
- mutex_t sync = mutex_create();
- thread_t one = thread_create(my_first_thread, sync);
- thread_t two = thread_create(my_second_thread, sync);
- thread_start(one);
- thread_start(two);
- printf("Hello, World!\n");
- thread_destroy(one);
- thread_destroy(two);
- mutex_destroy(sync);
-*/
#ifdef _WIN32
WSACleanup();
#endif
diff --git a/tcp_package.c b/tcp_package.c
new file mode 100644
index 0000000..ff4f8ba
--- /dev/null
+++ b/tcp_package.c
@@ -0,0 +1,82 @@
+//
+// Created by nicol on 2018-03-18.
+//
+
+#include
+#include
+#include
+#include "tcp_package.h"
+
+const uint32_t CommandOffset = 0;
+const uint32_t FlagsOffset = 1;
+const uint32_t CorrelationOffset = 2;
+const uint32_t AuthOffset = 18;
+const uint32_t MandatorySize = 18;
+
+const tcp_package_t* tcp_package_create(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data) {
+ tcp_package_t* pkg = malloc(sizeof(tcp_package_t));
+ pkg->command = command;
+ pkg->flags = 0;
+ pkg->correlation_id = *correlation_id;
+ pkg->login = 0;
+ pkg->password = 0;
+ pkg->data = data;
+ return pkg;
+}
+
+const tcp_package_t* tcp_package_create_authenticated(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data, const char* username, const char* password) {
+ tcp_package_t* pkg = malloc(sizeof(tcp_package_t));
+ pkg->command = command;
+ pkg->flags = 1;
+ pkg->correlation_id = *correlation_id;
+ pkg->login = username;
+ pkg->password = password;
+ pkg->data = data;
+ return pkg;
+}
+
+void tcp_package_free(const tcp_package_t* pkg) {
+ buffer_free(pkg->data);
+ free((void*)pkg);
+}
+
+buffer_t tcp_package_to_buffer(const tcp_package_t* pkg) {
+ uint8_t* _dst = malloc(MandatorySize + pkg->data.size + (pkg->flags ? 257*2 : 0));
+ _dst[CommandOffset] = pkg->command;
+ _dst[FlagsOffset] = pkg->flags;
+ memcpy(&_dst[CorrelationOffset], &pkg->correlation_id, sizeof(esc_uuid_t));
+ size_t size = MandatorySize;
+ if (pkg->flags) {
+ size_t l_len = strlen(pkg->login);
+ _dst[AuthOffset] = l_len;
+ strcpy(&_dst[AuthOffset+1], pkg->login);
+ size_t p_len = strlen(pkg->password);
+ _dst[AuthOffset+1+l_len] = p_len;
+ strcpy(&_dst[AuthOffset+2+l_len], pkg->password);
+ memcpy(&_dst[AuthOffset+2+l_len+p_len], pkg->data.data, pkg->data.size);
+ size += 2 + l_len + p_len + pkg->data.size;
+ } else {
+ memcpy(&_dst[AuthOffset], pkg->data.data, pkg->data.size);
+ size += pkg->data.size;
+ }
+ return buffer_from(_dst, size);
+}
+
+const tcp_package_t* tcp_package_from_buffer(buffer_t buffer) {
+ tcp_package_t* pkg = malloc(sizeof(tcp_package_t));
+ pkg->command = buffer.data[CommandOffset];
+ pkg->flags = buffer.data[FlagsOffset];
+ memcpy(&pkg->correlation_id, &buffer.data[CorrelationOffset], sizeof(esc_uuid_t));
+ if (pkg->flags) {
+ size_t l_len = buffer.data[AuthOffset];
+ size_t p_len = buffer.data[AuthOffset+1+l_len];
+ pkg->data = buffer_create(buffer.size - MandatorySize - 2 - l_len - p_len);
+ memcpy(pkg->data.data, &buffer.data[MandatorySize + 2 + l_len + p_len], buffer.size - MandatorySize - 2 - l_len - p_len);
+ } else {
+ pkg->login = 0;
+ pkg->password = 0;
+ pkg->data = buffer_create(buffer.size - MandatorySize);
+ memcpy(pkg->data.data, &buffer.data[MandatorySize], buffer.size - MandatorySize);
+ }
+ return pkg;
+}
diff --git a/tcp_package.h b/tcp_package.h
new file mode 100644
index 0000000..e0dbcc7
--- /dev/null
+++ b/tcp_package.h
@@ -0,0 +1,27 @@
+//
+// Created by nicol on 2018-03-18.
+//
+
+#ifndef ESC_TCP_PACKAGE_H
+#define ESC_TCP_PACKAGE_H
+
+#include "buffer.h"
+#include "uuid.h"
+
+struct st_tcp_package {
+ uint8_t command;
+ uint8_t flags;
+ esc_uuid_t correlation_id;
+ const char* login;
+ const char* password;
+ buffer_t data;
+};
+typedef struct st_tcp_package tcp_package_t;
+
+const tcp_package_t* tcp_package_create(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data);
+const tcp_package_t* tcp_package_create_authenticated(uint8_t command, const esc_uuid_t* correlation_id, buffer_t data, const char* username, const char* password);
+void tcp_package_free(const tcp_package_t* pkg);
+buffer_t tcp_package_to_buffer(const tcp_package_t* pkg);
+const tcp_package_t* tcp_package_from_buffer(buffer_t buffer);
+
+#endif //ESC_TCP_PACKAGE_H