add fields to recorded_event_t

use size_t
This commit is contained in:
Nicolas Dextraze 2018-03-19 11:12:05 -07:00
parent 9147e4a63f
commit 8d81a8a55a
7 changed files with 57 additions and 26 deletions

View File

@ -3,18 +3,25 @@
// //
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include "buffer.h" #include "buffer.h"
const buffer_t buffer_create(uint32_t size) { const buffer_t buffer_create(size_t size) {
buffer_t buf = {size, malloc(size)}; buffer_t buf = {size, malloc(size)};
return buf; return buf;
} }
const buffer_t buffer_from(uint8_t* data, uint32_t size) { const buffer_t buffer_from(uint8_t* data, size_t size) {
buffer_t buf = {size, data}; buffer_t buf = {size, data};
return buf; return buf;
} }
const buffer_t buffer_copyfrom(uint8_t* data, size_t size) {
buffer_t buf = {size, malloc(size)};
memcpy(buf.data, data, size);
return buf;
}
void buffer_free(buffer_t buffer) { void buffer_free(buffer_t buffer) {
free(buffer.data); free(buffer.data);
} }

View File

@ -8,13 +8,14 @@
#include <stdint.h> #include <stdint.h>
struct st_buffer { struct st_buffer {
uint32_t size; size_t size;
uint8_t* data; uint8_t* data;
}; };
typedef struct st_buffer buffer_t; typedef struct st_buffer buffer_t;
const buffer_t buffer_create(uint32_t size); const buffer_t buffer_create(size_t size);
const buffer_t buffer_from(uint8_t* data, uint32_t size); const buffer_t buffer_from(uint8_t* data, size_t size);
const buffer_t buffer_copyfrom(uint8_t* data, size_t size);
void buffer_free(buffer_t buffer); void buffer_free(buffer_t buffer);
#endif //ESC_BUFFER_H #endif //ESC_BUFFER_H

View File

@ -11,8 +11,12 @@
#include "proto.h" #include "proto.h"
#include "tcp_package.h" #include "tcp_package.h"
typedef int bool_t;
const bool_t BOOL_TRUE = 1;
const bool_t BOOL_FALSE = 0;
struct st_connection_settings { struct st_connection_settings {
unsigned char use_ssl_connection; bool_t use_ssl_connection;
}; };
const esc_connection_settings_t default_connection_settings = { const esc_connection_settings_t default_connection_settings = {
@ -44,7 +48,7 @@ struct st_connection {
struct st_static_endpoint_discoverer { struct st_static_endpoint_discoverer {
tcp_endpoint_t tcp_endpoint; tcp_endpoint_t tcp_endpoint;
unsigned char use_ssl_connection; bool_t use_ssl_connection;
}; };
typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t; typedef struct st_static_endpoint_discoverer static_endpoint_discoverer_t;
@ -60,9 +64,10 @@ const node_endpoints_t* static_discover(const static_endpoint_discoverer_t* disc
int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) { int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_t* pkg) {
char uuid_buf[37]; char uuid_buf[37];
printf("connection_send_tcp_package: %u %u %s %u\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size); printf("connection_send_tcp_package: %u %u %s %lu\n", pkg->command, pkg->flags, esc_uuid_format(&pkg->correlation_id, uuid_buf, 37), pkg->data.size);
buffer_t send_buffer = tcp_package_to_buffer(pkg); buffer_t send_buffer = tcp_package_to_buffer(pkg);
if (socket_send(conn->tcp_conn, (char *) &send_buffer.size, sizeof(uint32_t)) <= 0) { uint32_t size = (uint32_t)send_buffer.size;
if (socket_send(conn->tcp_conn, (char *) &size, sizeof(uint32_t)) <= 0) {
buffer_free(send_buffer); buffer_free(send_buffer);
return -1; return -1;
} }
@ -76,21 +81,21 @@ int connection_send_tcp_package(const esc_connection_t* conn, const tcp_package_
const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) { const tcp_package_t* connection_recv_tcp_package(const esc_connection_t* conn) {
uint32_t recv_size; uint32_t recv_size;
int rc; ssize_t rc;
if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) { if ((rc = socket_recv(conn->tcp_conn, (char *)&recv_size, sizeof(uint32_t))) != 4) {
printf("%d %d", rc, socket_error()); printf("%ld %d", rc, socket_error());
return 0; return 0;
} }
buffer_t recv_buffer = buffer_create(recv_size); buffer_t recv_buffer = buffer_create(recv_size);
while(recv_size > 0) { while(recv_size > 0) {
int pos = recv_buffer.size - recv_size; size_t pos = recv_buffer.size - recv_size;
rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer.data[pos], recv_size); rc = socket_recv(conn->tcp_conn, (char *)&recv_buffer.data[pos], recv_size);
recv_size -= rc; recv_size -= rc;
} }
const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer); const tcp_package_t* recv_pkg = tcp_package_from_buffer(recv_buffer);
buffer_free(recv_buffer); buffer_free(recv_buffer);
char uuid_buf[37]; char uuid_buf[37];
printf("connection_recv_tcp_package: %u %u %s %u\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size); printf("connection_recv_tcp_package: %u %u %s %lu\n", recv_pkg->command, recv_pkg->flags, esc_uuid_format(&recv_pkg->correlation_id, uuid_buf, 37), recv_pkg->data.size);
//for (int32_t i=0;i<recv_pkg->data.size;i++) { //for (int32_t i=0;i<recv_pkg->data.size;i++) {
// printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]); // printf("%x (%c) ", recv_pkg->data.data[i], recv_pkg->data.data[i]);
//} //}
@ -179,7 +184,7 @@ const esc_credentials_t* esc_credentials_create(const char* username, const char
return creds; return creds;
} }
const char* string_clone(const char* src) { const char* string_copy(const char *src) {
char* dst = malloc(strlen(src)+1); char* dst = malloc(strlen(src)+1);
strcpy(dst, src); strcpy(dst, src);
return dst; return dst;
@ -189,7 +194,13 @@ const esc_recorded_event_t* recorded_event_create(EventStore__Client__Messages__
if (msg == 0) return 0; if (msg == 0) return 0;
esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t)); esc_recorded_event_t* ev = malloc(sizeof(esc_recorded_event_t));
ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len); ev->event_id = esc_uuid_from(msg->event_id.data, msg->event_id.len);
ev->event_type = string_clone(msg->event_type); ev->event_type = string_copy(msg->event_type);
ev->event_number = msg->event_number;
ev->event_stream_id = string_copy(msg->event_stream_id);
ev->created_epoch = msg->has_created_epoch ? msg->created_epoch : 0;
ev->data = buffer_copyfrom(msg->data.data, msg->data.len);
ev->metadata = buffer_copyfrom(msg->metadata.data, msg->metadata.len);
return ev; return ev;
} }
@ -226,8 +237,10 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti
return 0; return 0;
} }
esc_connection_t* _conn = (esc_connection_t*)conn;
EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg = EventStore__Client__Messages__ReadAllEventsCompleted *recv_msg =
event_store__client__messages__read_all_events_completed__unpack(&conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data); event_store__client__messages__read_all_events_completed__unpack(&_conn->protobuf_c_allocator, recv_pkg->data.size, recv_pkg->data.data);
esc_all_events_slice_t* result = malloc(sizeof(esc_all_events_slice_t)); esc_all_events_slice_t* result = malloc(sizeof(esc_all_events_slice_t));
result->read_direction = "forward"; result->read_direction = "forward";
@ -241,12 +254,12 @@ const esc_all_events_slice_t* esc_connection_read_all_forward(const esc_connecti
for (size_t i = 0; i < recv_msg->n_events; i++) { for (size_t i = 0; i < recv_msg->n_events; i++) {
result->events[i] = resolved_event_create(recv_msg->events[i]); result->events[i] = resolved_event_create(recv_msg->events[i]);
} }
event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &conn->protobuf_c_allocator); event_store__client__messages__read_all_events_completed__free_unpacked(recv_msg, &_conn->protobuf_c_allocator);
return result; return result;
} }
const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) { const char* esc_position_format(const esc_position_t* position, char* buffer, size_t buf_size) {
snprintf(buffer, buf_size, "%llu/%llu", position->prepare_position, position->commit_position); snprintf(buffer, buf_size, "%lu/%lu", position->prepare_position, position->commit_position);
return buffer; return buffer;
} }

View File

@ -6,6 +6,7 @@
#define ESC_ESC_H #define ESC_ESC_H
#include "uuid.h" #include "uuid.h"
#include "buffer.h"
struct st_connection_settings; struct st_connection_settings;
typedef struct st_connection_settings esc_connection_settings_t; typedef struct st_connection_settings esc_connection_settings_t;
@ -28,6 +29,11 @@ typedef struct st_esc_position esc_position_t;
struct st_recorded_event { struct st_recorded_event {
const esc_uuid_t* event_id; const esc_uuid_t* event_id;
const char* event_type; const char* event_type;
int64_t event_number;
const char *event_stream_id;
int64_t created_epoch;
buffer_t data;
buffer_t metadata;
}; };
typedef struct st_recorded_event esc_recorded_event_t; typedef struct st_recorded_event esc_recorded_event_t;

View File

@ -32,8 +32,12 @@ int main() {
result->is_end_of_stream); result->is_end_of_stream);
char uuid_buf[37]; char uuid_buf[37];
for (size_t i = 0; i < result->n_events; i++) { for (size_t i = 0; i < result->n_events; i++) {
printf("%s %s\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37), printf("%s %s %ld@%s %lu %lu\n", esc_uuid_format(result->events[i]->event->event_id, uuid_buf, 37),
result->events[i]->event->event_type); result->events[i]->event->event_type,
result->events[i]->event->event_number,
result->events[i]->event->event_stream_id,
result->events[i]->event->data.size,
result->events[i]->event->metadata.size);
} }
} while(result->is_end_of_stream == 0); } while(result->is_end_of_stream == 0);

View File

@ -109,12 +109,12 @@ int socket_connect(socket_t s, char* addr, unsigned short port) {
return connect(s->s, result->ai_addr, (int)result->ai_addrlen); return connect(s->s, result->ai_addr, (int)result->ai_addrlen);
} }
int socket_send(socket_t s, char* data, int len) { ssize_t socket_send(socket_t s, char* data, size_t len) {
return (int)send(s->s, data, (size_t)len, 0); return send(s->s, data, len, 0);
} }
int socket_recv(socket_t s, char* buf, int len) { ssize_t socket_recv(socket_t s, char* buf, size_t len) {
return (int)recv(s->s, buf, (size_t)len, 0); return recv(s->s, buf, len, 0);
} }
int socket_error() { int socket_error() {

View File

@ -16,8 +16,8 @@ enum {
socket_t socket_create(int type); socket_t socket_create(int type);
void socket_close(socket_t s); void socket_close(socket_t s);
int socket_connect(socket_t s, char* addr, unsigned short port); int socket_connect(socket_t s, char* addr, unsigned short port);
int socket_send(socket_t s, char* data, int len); ssize_t socket_send(socket_t s, char* data, size_t len);
int socket_recv(socket_t s, char* buf, int len); ssize_t socket_recv(socket_t s, char* buf, size_t len);
int socket_error(); int socket_error();
#endif //ESC_SOCKET_H #endif //ESC_SOCKET_H