move utils into subfolder

manager thread (wip)
This commit is contained in:
Nicolas Dextraze 2018-03-19 16:39:17 -07:00
parent 8d81a8a55a
commit 9498cc74e0
17 changed files with 194 additions and 72 deletions

View File

@ -8,10 +8,10 @@ if(WIN32)
link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs) link_directories(c:/Users/nicol/dev/thirdparty/protobuf-c/protobuf-c/.libs)
endif() endif()
add_executable(esc src/main.c src/mutex.c src/mutex.h src/thread.c src/thread.h src/socket.c src/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/uuid.c src/uuid.h src/buffer.c src/buffer.h src/tcp_package.c src/tcp_package.h) add_executable(esc src/main.c src/utils/mutex.c src/utils/mutex.h src/utils/thread.c src/utils/thread.h src/utils/socket.c src/utils/socket.h src/esc.c src/esc.h src/proto.c src/proto.h src/utils/uuid.c src/utils/uuid.h src/utils/buffer.c src/utils/buffer.h src/tcp_package.c src/tcp_package.h)
if(WIN32) if(WIN32)
target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a) target_link_libraries(esc wsock32 ws2_32 libprotobuf-c.a)
else() else()
target_link_libraries(esc protobuf-c) target_link_libraries(esc pthread protobuf-c)
endif() endif()

View File

@ -5,16 +5,38 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <stdarg.h>
#include <unistd.h>
#include "esc.h" #include "esc.h"
#include "socket.h" #include "utils/socket.h"
#include "buffer.h" #include "utils/buffer.h"
#include "proto.h" #include "proto.h"
#include "tcp_package.h" #include "tcp_package.h"
#include "utils/thread.h"
#include "utils/mutex.h"
const char* string_copy(const char *src) {
char* dst = malloc(strlen(src)+1);
strcpy(dst, src);
return dst;
}
typedef int bool_t; typedef int bool_t;
const bool_t BOOL_TRUE = 1; const bool_t BOOL_TRUE = 1;
const bool_t BOOL_FALSE = 0; const bool_t BOOL_FALSE = 0;
#define ERROR_BUFFER_SIZE 4096
esc_error_t error_create(int code, char* format, ...) {
va_list vl;
va_start(vl, format);
char buf[ERROR_BUFFER_SIZE];
size_t size = (size_t)snprintf(buf, ERROR_BUFFER_SIZE, format, vl);
char* msg = malloc(size+1);
strcpy(msg, buf);
esc_error_t res = {code, msg};
return res;
}
struct st_connection_settings { struct st_connection_settings {
bool_t use_ssl_connection; bool_t use_ssl_connection;
}; };
@ -40,10 +62,13 @@ typedef const node_endpoints_t* (*endpoint_discoverer_t)(const void* discover_da
struct st_connection { struct st_connection {
esc_connection_settings_t settings; esc_connection_settings_t settings;
const char* name;
void* discoverer_data; void* discoverer_data;
endpoint_discoverer_t discover; endpoint_discoverer_t discover;
socket_t tcp_conn; socket_t tcp_conn;
ProtobufCAllocator protobuf_c_allocator; ProtobufCAllocator protobuf_c_allocator;
thread_t manager_thread;
mutex_t mutex_lock;
}; };
struct st_static_endpoint_discoverer { struct st_static_endpoint_discoverer {
@ -134,9 +159,32 @@ const esc_connection_t* esc_connection_create(const esc_connection_settings_t* c
conn->protobuf_c_allocator.alloc = protobuf_c_alloc; conn->protobuf_c_allocator.alloc = protobuf_c_alloc;
conn->protobuf_c_allocator.free = protobuf_c_free; conn->protobuf_c_allocator.free = protobuf_c_free;
conn->protobuf_c_allocator.allocator_data = 0; conn->protobuf_c_allocator.allocator_data = 0;
if (connection_name == 0 || strcmp(connection_name, "") == 0) {
const esc_uuid_t* uuid = esc_uuid_create();
char buf[40];
esc_uuid_format(uuid, buf, 40);
conn->name = string_copy(buf);
free(uuid);
} else {
conn->name = connection_name;
}
return conn; return conn;
} }
void* connection_thread(void* arg) {
const esc_connection_t* conn = arg;
while(1) {
//mutex_lock(conn->mutex_lock);
//if (socket_readable(conn->tcp_conn)) {
//}
//mutex_unlock(conn->mutex_lock);
sleep(1);
}
}
// return 0 on success // return 0 on success
// return non-zero on failure and sets last_error on connection // return non-zero on failure and sets last_error on connection
int esc_connection_connect(const esc_connection_t* conn) { int esc_connection_connect(const esc_connection_t* conn) {
@ -156,7 +204,7 @@ int esc_connection_connect(const esc_connection_t* conn) {
// build message // build message
EventStore__Client__Messages__IdentifyClient identify_client; EventStore__Client__Messages__IdentifyClient identify_client;
event_store__client__messages__identify_client__init(&identify_client); event_store__client__messages__identify_client__init(&identify_client);
identify_client.connection_name = "abc123"; identify_client.connection_name = (char*)conn->name;
identify_client.version = 1; identify_client.version = 1;
size_t s = event_store__client__messages__identify_client__get_packed_size(&identify_client); size_t s = event_store__client__messages__identify_client__get_packed_size(&identify_client);
uint8_t buffer[s]; uint8_t buffer[s];
@ -174,6 +222,12 @@ int esc_connection_connect(const esc_connection_t* conn) {
return -5; return -5;
} }
_conn->mutex_lock = mutex_create();
_conn->manager_thread = thread_create(connection_thread, _conn);
if (thread_start(_conn->manager_thread)) {
return -6;
}
return 0; return 0;
} }
@ -184,12 +238,6 @@ const esc_credentials_t* esc_credentials_create(const char* username, const char
return creds; return creds;
} }
const char* string_copy(const char *src) {
char* dst = malloc(strlen(src)+1);
strcpy(dst, src);
return dst;
}
const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) { const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__EventRecord* msg) {
if (msg == 0) return 0; if (msg == 0) return 0;
esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t)); esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t));
@ -259,6 +307,10 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti
return result; return result;
} }
void esc_connection_close(const esc_connection_t* conn) {
socket_close(conn->tcp_conn);
}
const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) {
snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position); snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position);
return buffer; return buffer;

View File

@ -5,8 +5,14 @@
#ifndef ESC_ESC_H #ifndef ESC_ESC_H
#define ESC_ESC_H #define ESC_ESC_H
#include "uuid.h" #include "utils/uuid.h"
#include "buffer.h" #include "utils/buffer.h"
struct st_error {
const int code;
const char* message;
};
typedef struct st_error esc_error_t;
struct st_connection_settings; struct st_connection_settings;
typedef struct st_connection_settings esc_connection_settings_t; typedef struct st_connection_settings esc_connection_settings_t;
@ -56,10 +62,14 @@ typedef struct st_all_events_slice esc_all_events_slice_t;
const esc_connection_settings_t* esc_default_connection_settings; const esc_connection_settings_t* esc_default_connection_settings;
// Connection
const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name); const esc_connection_t* esc_connection_create(const esc_connection_settings_t* connection_settings, const char* addr, const char* connection_name);
int esc_connection_connect(const esc_connection_t* conn); int esc_connection_connect(const esc_connection_t* conn);
const esc_credentials_t* esc_credentials_create(const char* username, const char* password); const esc_credentials_t* esc_credentials_create(const char* username, const char* password);
const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials); const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connection_t* conn, const esc_position_t* last_checkpoint, unsigned int count, const esc_credentials_t* credentials);
void esc_connection_close(const esc_connection_t* conn);
// Utils
const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size); const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size);
#endif //ESC_ESC_H #endif //ESC_ESC_H

View File

@ -1,6 +1,6 @@
#include <stdio.h> #include <stdio.h>
#include "mutex.h" #include "utils/mutex.h"
#include "esc.h" #include "esc.h"
int main() { int main() {
@ -20,7 +20,7 @@ int main() {
const esc_all_events_slice_t *result = 0; const esc_all_events_slice_t *result = 0;
do { do {
result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 1024, credentials); result = esc_connection_read_all_forward(conn, result ? &result->next_position : NULL, 100, credentials);
if (result == 0) { if (result == 0) {
return -3; return -3;
} }
@ -41,6 +41,8 @@ int main() {
} }
} while(result->is_end_of_stream == 0); } while(result->is_end_of_stream == 0);
esc_connection_close(conn);
#ifdef _WIN32 #ifdef _WIN32
WSACleanup(); WSACleanup();
#endif #endif

View File

@ -5,8 +5,8 @@
#ifndef ESC_TCP_PACKAGE_H #ifndef ESC_TCP_PACKAGE_H
#define ESC_TCP_PACKAGE_H #define ESC_TCP_PACKAGE_H
#include "buffer.h" #include "utils/buffer.h"
#include "uuid.h" #include "utils/uuid.h"
struct st_tcp_package { struct st_tcp_package {
uint8_t command; uint8_t command;

View File

@ -1,32 +0,0 @@
//
// Created by nicol on 2018-03-18.
//
#include "thread.h"
#ifdef _WIN32
thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) {
thread_t thread = malloc(sizeof(struct st_thread));
thread->handle = CreateThread(NULL, 0, thread_start, thread_arg, CREATE_SUSPENDED, &thread->id);
if (thread->handle == NULL) {
free(thread);
return NULL;
}
return thread;
}
int thread_start(thread_t thread) {
if (ResumeThread(thread->handle) == -1) return -1;
return 0;
}
int thread_wait(thread_t thread) {
if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) return -1;
return 0;
}
int thread_destroy(thread_t thread) {
if (thread_wait(thread) != 0) SuspendThread(thread->handle);
CloseHandle(thread->handle);
}
#endif

View File

@ -1,19 +0,0 @@
//
// Created by nicol on 2018-03-18.
//
#ifndef ESC_THREAD_H
#define ESC_THREAD_H
#ifdef _WIN32
#include <windows.h>
struct st_thread {
DWORD id;
HANDLE handle;
};
typedef struct st_thread* thread_t;
typedef LPTHREAD_START_ROUTINE thread_start_t;
typedef LPVOID thread_arg_t;
#endif
#endif //ESC_THREAD_H

View File

@ -6,6 +6,7 @@
#define ESC_BUFFER_H #define ESC_BUFFER_H
#include <stdint.h> #include <stdint.h>
#include <stddef.h>
struct st_buffer { struct st_buffer {
size_t size; size_t size;

View File

@ -27,7 +27,8 @@ void mutex_unlock(mutex_t mutex) {
void mutex_destroy(mutex_t mutex) { void mutex_destroy(mutex_t mutex) {
DeleteCriticalSection(mutex); DeleteCriticalSection(mutex);
} }
#else #endif
#ifdef __linux__
mutex_t mutex_create() { mutex_t mutex_create() {
mutex_t mutex = malloc(sizeof(mutex_t)); mutex_t mutex = malloc(sizeof(mutex_t));
pthread_mutex_init(mutex, 0); pthread_mutex_init(mutex, 0);

View File

@ -8,7 +8,8 @@
#ifdef _WIN32 #ifdef _WIN32
#include <windows.h> #include <windows.h>
typedef LPCRITICAL_SECTION mutex_t; typedef LPCRITICAL_SECTION mutex_t;
#else #endif
#ifdef __linux__
#include <pthread.h> #include <pthread.h>
typedef pthread_mutex_t* mutex_t; typedef pthread_mutex_t* mutex_t;
#endif #endif

View File

@ -66,7 +66,8 @@ int socket_recv(socket_t s, char* buf, int len) {
int socket_error() { int socket_error() {
return WSAGetLastError(); return WSAGetLastError();
} }
#else #endif
#ifdef __linux__
struct st_socket { struct st_socket {
int s; int s;
int af; int af;

75
src/utils/thread.c Normal file
View File

@ -0,0 +1,75 @@
//
// Created by nicol on 2018-03-18.
//
#include <stdlib.h>
#include "thread.h"
#ifdef _WIN32
thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) {
thread_t thread = malloc(sizeof(struct st_thread));
thread->handle = CreateThread(NULL, 0, thread_start, thread_arg, CREATE_SUSPENDED, &thread->id);
if (thread->handle == NULL) {
free(thread);
return NULL;
}
return thread;
}
int thread_start(thread_t thread) {
if (ResumeThread(thread->handle) == -1) return -1;
return 0;
}
int thread_wait(thread_t thread) {
if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) return -1;
return 0;
}
int thread_destroy(thread_t thread) {
if (thread_wait(thread) != 0) SuspendThread(thread->handle);
CloseHandle(thread->handle);
}
#endif
#ifdef __linux__
#include <pthread.h>
struct st_thread {
pthread_t handle;
thread_start_t start;
thread_arg_t arg;
int last_error;
};
thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg) {
thread_t thread = malloc(sizeof(struct st_thread));
thread->handle = 0;
thread->start = thread_start;
thread->arg = thread_arg;
thread->last_error = 0;
return thread;
}
int thread_start(thread_t thread) {
int rc = pthread_create(&thread->handle, NULL, thread->start, thread->arg);
if (rc != 0) thread->last_error = rc;
return rc;
}
int thread_wait(thread_t thread) {
void* result;
int rc = pthread_join(thread->handle, &result);
if (rc != 0) thread->last_error = rc;
return rc;
}
int thread_destroy(thread_t thread) {
if (thread_wait(thread)) return pthread_cancel(thread->handle);
free(thread);
return 0;
}
int thread_error(thread_t thread) {
return thread->last_error;
}
#endif

29
src/utils/thread.h Normal file
View File

@ -0,0 +1,29 @@
//
// Created by nicol on 2018-03-18.
//
#ifndef ESC_THREAD_H
#define ESC_THREAD_H
#ifdef _WIN32
#include <windows.h>
struct st_thread {
DWORD id;
HANDLE handle;
};
typedef struct st_thread* thread_t;
typedef LPTHREAD_START_ROUTINE thread_start_t;
typedef LPVOID thread_arg_t;
#endif
typedef void* (*thread_start_t)(void*);
typedef void* thread_arg_t;
typedef struct st_thread* thread_t;
thread_t thread_create(thread_start_t thread_start, thread_arg_t thread_arg);
int thread_start(thread_t thread);
int thread_wait(thread_t thread);
int thread_destroy(thread_t thread);
int thread_error(thread_t thread);
void* thread_result(thread_t thread);
#endif //ESC_THREAD_H

View File

@ -7,6 +7,7 @@
#include <stdio.h> #include <stdio.h>
#include "uuid.h" #include "uuid.h"
//TODO: fixme - this is not secure, use crypto random bytes instead of rand
const esc_uuid_t* esc_uuid_create() { const esc_uuid_t* esc_uuid_create() {
esc_uuid_t* result = malloc(sizeof(esc_uuid_t)); esc_uuid_t* result = malloc(sizeof(esc_uuid_t));
srand(time(NULL)); srand(time(NULL));
@ -26,7 +27,7 @@ const esc_uuid_t* esc_uuid_from(uint8_t* src, size_t size) {
} }
const char* esc_uuid_format(const esc_uuid_t* uuid, char* buffer, size_t buf_size) { const char* esc_uuid_format(const esc_uuid_t* uuid, char* buffer, size_t buf_size) {
snprintf(buffer, buf_size, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", snprintf(buffer, buf_size, "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x",
uuid->data[0], uuid->data[0],
uuid->data[1], uuid->data[1],
uuid->data[2], uuid->data[2],