From 16c081020a74252b68fe07343ade41fc33cc107e Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Wed, 18 Oct 2017 14:39:25 -0700 Subject: [PATCH] Add ProjectionsManager --- .idea/misc.xml | 6 + README.md | 4 +- index.d.ts | 54 +++++- package.json | 6 +- src/client.js | 2 + src/errors/projectionCommandFailedError.js | 10 + src/projections/projectionDetails.js | 56 ++++++ src/projections/projectionsClient.js | 170 +++++++++++++++++ src/projections/projectionsManager.js | 202 +++++++++++++++++++++ test/projections_test.js | 47 +++++ 10 files changed, 553 insertions(+), 4 deletions(-) create mode 100644 .idea/misc.xml create mode 100644 src/errors/projectionCommandFailedError.js create mode 100644 src/projections/projectionDetails.js create mode 100644 src/projections/projectionsClient.js create mode 100644 src/projections/projectionsManager.js create mode 100644 test/projections_test.js diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..e01539a --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/README.md b/README.md index f3ef18d..8a5990c 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Install using `npm install node-eventstore-client` ### Dependencies -- Node.js >= 0.12 +- Node.js >= 4.0 - Modules: [long](https://www.npmjs.org/package/long), [protobufjs](https://www.npmjs.org/package/protobufjs), [uuid](https://www.npmjs.org/package/uuid) (installed via `npm install`) ### API Documentation @@ -112,7 +112,7 @@ To generate a test event, open a separate console and run: To run the tests it is recommended that you use an in-memory instance of the eventstore so you don't pollute your dev instance. - EventStore.ClusterNode.exe --memdb + EventStore.ClusterNode.exe --run-projections=all --memdb To execute the tests suites simply run diff --git a/index.d.ts b/index.d.ts index b965bd2..ccd0d6f 100644 --- a/index.d.ts +++ b/index.d.ts @@ -26,7 +26,7 @@ export class PersistentSubscriptionSettings { export namespace SystemConsumerStrategies { const DispatchToSingle: string; const RoundRobin: string; - const Pinned: string + const Pinned: string; } export class GossipSeed { @@ -314,3 +314,55 @@ export interface ConnectionSettings { export function createConnection(settings: ConnectionSettings, endPointOrGossipSeed: string | TcpEndPoint | GossipSeed[], connectionName?: string): EventStoreNodeConnection; export function createJsonEventData(eventId: string, event: any, metadata?: any, type?: string): EventData; export function createEventData(eventId: string, type: string, isJson: boolean, data: Buffer, metadata?: Buffer): EventData; + +// Projections +export interface ProjectionDetails { + coreProcessingTime: number, + version: number, + epoch: number, + effectiveName: string, + writesInProgress: number, + readsInProgress: number, + partitionsCached: number, + status: string, + stateReason: string, + name: string, + mode: string, + position: string, + progress: number, + lastCheckpoint: string, + eventsProcessedAfterRestart: number, + statusUrl: string, + stateUrl: string, + resultUrl: string, + queryUrl: string, + enableCommandUrl: string, + disableCommandUrl: string, + checkpointStatus: string, + bufferedEvents: number, + writePendingEventsBeforeCheckpoint: number, + writePendingEventsAfterCheckpoint: number +} + +export class ProjectionsManager { + constructor(log: Logger, httpEndPoint: string, operationTimeout: number); + enable(name: string, userCredentials: UserCredentials): Promise; + disable(name: string, userCredentials: UserCredentials): Promise; + abort(name: string, userCredentials: UserCredentials): Promise; + createOneTime(query: string, userCredentials: UserCredentials): Promise; + createTransient(name: string, query: string, userCredentials: UserCredentials): Promise; + createContinuous(name: string, query: string, trackEmittedStreams: boolean, userCredentials: UserCredentials): Promise; + listAll(userCredentials: UserCredentials): Promise; + listOneTime(userCredentials: UserCredentials): Promise; + listContinuous(userCredentials: UserCredentials): Promise; + getStatus(name: string, userCredentials: UserCredentials): Promise; + getState(name: string, userCredentials: UserCredentials): Promise; + getPartitionState(name: string, partitionId: string, userCredentials: UserCredentials): Promise; + getResult(name: string, userCredentials: UserCredentials): Promise; + getPartitionResult(name: string, partitionId: string, userCredentials: UserCredentials): Promise; + getStatistics(name: string, userCredentials: UserCredentials): Promise; + getQuery(name: string, userCredentials: UserCredentials): Promise; + getState(name: string, userCredentials: UserCredentials): Promise; + updateQuery(name: string, query: string, userCredentials: UserCredentials): Promise; + deleteQuery(name: string, deleteEmittedStreams: boolean, userCredentials: UserCredentials): Promise; +} diff --git a/package.json b/package.json index c51c8b5..42465ce 100644 --- a/package.json +++ b/package.json @@ -1,9 +1,13 @@ { "name": "node-eventstore-client", - "version": "0.1.8", + "version": "0.1.9", "description": "A port of the EventStore .Net ClientAPI to Node.js", "main": "index.js", "types": "index.d.ts", + "engines": { + "node": ">=4.0" + }, + "engineStrict": true, "scripts": { "clean": "rm lib/dist.js", "build": "webpack", diff --git a/src/client.js b/src/client.js index 3044d83..a79f797 100644 --- a/src/client.js +++ b/src/client.js @@ -51,10 +51,12 @@ module.exports.PersistentSubscriptionSettings = require('./persistentSubscriptio module.exports.SystemConsumerStrategies = require('./systemConsumerStrategies'); module.exports.GossipSeed = require('./gossipSeed'); module.exports.EventStoreConnection = require('./eventStoreConnection'); +module.exports.ProjectionsManager = require('./projections/projectionsManager'); // Expose errors module.exports.WrongExpectedVersionError = require('./errors/wrongExpectedVersionError'); module.exports.StreamDeletedError = require('./errors/streamDeletedError'); module.exports.AccessDeniedError = require('./errors/accessDeniedError'); +module.exports.ProjectionCommandFailedError = require('./errors/projectionCommandFailedError'); // Expose enums/constants module.exports.expectedVersion = expectedVersion; module.exports.positions = positions; diff --git a/src/errors/projectionCommandFailedError.js b/src/errors/projectionCommandFailedError.js new file mode 100644 index 0000000..9b46310 --- /dev/null +++ b/src/errors/projectionCommandFailedError.js @@ -0,0 +1,10 @@ +const util = require('util'); + +function ProjectionCommandFailedError(httpStatusCode, message) { + Error.captureStackTrace(this, this.constructor); + this.httpStatusCode = httpStatusCode; + this.message = message; +} +util.inherits(ProjectionCommandFailedError, Error); + +module.exports = ProjectionCommandFailedError; diff --git a/src/projections/projectionDetails.js b/src/projections/projectionDetails.js new file mode 100644 index 0000000..17ed5d7 --- /dev/null +++ b/src/projections/projectionDetails.js @@ -0,0 +1,56 @@ +function ProjectionDetails( + coreProcessingTime, + version, + epoch, + effectiveName, + writesInProgress, + readsInProgress, + partitionsCached, + status, + stateReason, + name, + mode, + position, + progress, + lastCheckpoint, + eventsProcessedAfterRestart, + statusUrl, + stateUrl, + resultUrl, + queryUrl, + enableCommandUrl, + disableCommandUrl, + checkpointStatus, + bufferedEvents, + writePendingEventsBeforeCheckpoint, + writePendingEventsAfterCheckpoint +) { + this.coreProcessingTime = coreProcessingTime; + this.version = version; + this.epoch = epoch; + this.effectiveName = effectiveName; + this.writesInProgress = writesInProgress; + this.readsInProgress = readsInProgress; + this.partitionsCached = partitionsCached; + this.status = status; + this.stateReason = stateReason; + this.name = name; + this.mode = mode; + this.position = position; + this.progress = progress; + this.lastCheckpoint = lastCheckpoint; + this.eventsProcessedAfterRestart = eventsProcessedAfterRestart; + this.statusUrl = statusUrl; + this.stateUrl = stateUrl; + this.resultUrl = resultUrl; + this.queryUrl = queryUrl; + this.enableCommandUrl = enableCommandUrl; + this.disableCommandUrl = disableCommandUrl; + this.checkpointStatus = checkpointStatus; + this.bufferedEvents = bufferedEvents; + this.writePendingEventsBeforeCheckpoint = writePendingEventsBeforeCheckpoint; + this.writePendingEventsAfterCheckpoint = writePendingEventsAfterCheckpoint; + Object.freeze(this); +} + +module.exports = ProjectionDetails; \ No newline at end of file diff --git a/src/projections/projectionsClient.js b/src/projections/projectionsClient.js new file mode 100644 index 0000000..eecefc8 --- /dev/null +++ b/src/projections/projectionsClient.js @@ -0,0 +1,170 @@ +const http = require('http'); +const url = require('url'); +const util = require('util'); +const ProjectionCommandFailedError = require('../errors/projectionCommandFailedError'); + +const HTTP_OK = 200; +const HTTP_CREATED = 201; + +function safeParseJson(json) { + try { + return JSON.parse(json); + } catch(e) { + return null; + } +} + +function ProjectionsClient(log, operationTimeout) { + this._log = log; + this._operationTimeout = operationTimeout; +} + +ProjectionsClient.prototype.enable = function(httpEndPoint, name, userCredentials) { + return this.sendPost(httpEndPoint + '/projection/' + name + '/command/enable', '', userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.disable = function(httpEndPoint, name, userCredentials) { + return this.sendPost(httpEndPoint + '/projection/' + name + '/command/disable', '', userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.abort = function(httpEndPoint, name, userCredentials) { + return this.sendPost(httpEndPoint + '/projection/' + name + '/command/abort', '', userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.createOneTime = function(httpEndPoint, query, userCredentials) { + return this.sendPost(httpEndPoint + '/projections/onetime?type=JS', query, userCredentials, HTTP_CREATED); +}; + +ProjectionsClient.prototype.createTransient = function(httpEndPoint, name, query, userCredentials) { + return this.sendPost(httpEndPoint + '/projections/transient?name=' + name + '&type=JS', query, userCredentials, HTTP_CREATED); +}; + +ProjectionsClient.prototype.createContinuous = function(httpEndPoint, name, query, trackEmittedStreams, userCredentials) { + return this.sendPost(httpEndPoint + '/projections/continuous?name=' + name + '&type=JS&emit=1&trackEmittedStreams=' + trackEmittedStreams, query, userCredentials, HTTP_CREATED); +}; + +ProjectionsClient.prototype.listAll = function(httpEndPoint, userCredentials) { + return this.sendGet(httpEndPoint + '/projections/any', userCredentials, HTTP_OK) + .then(function (json) { + var r = safeParseJson(json); + if (r && r.projections) return r.projections; + return null; + }); +}; + +ProjectionsClient.prototype.listOneTime = function(httpEndPoint, userCredentials) { + return this.sendGet(httpEndPoint + '/projections/onetime', userCredentials, HTTP_OK) + .then(function (json) { + var r = safeParseJson(json); + if (r && r.projections) return r.projections; + return null; + }); +}; + +ProjectionsClient.prototype.listContinuous = function(httpEndPoint, userCredentials) { + return this.sendGet(httpEndPoint + '/projections/continuous', userCredentials, HTTP_OK) + .then(function (json) { + var r = safeParseJson(json); + if (r && r.projections) return r.projections; + return null; + }); +}; + +ProjectionsClient.prototype.getStatus = function(httpEndPoint, name, userCredentials) { + return this.sendGet(httpEndPoint + '/projection/' + name, userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.getState = function(httpEndPoint, name, userCredentials) { + return this.sendGet(httpEndPoint + '/projection/' + name + '/state', userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.getPartitionState = function(httpEndPoint, name, partitionId, userCredentials) { + return this.sendGet(httpEndPoint + '/projection/' + name + '/state?partition=' + partitionId, userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.getResult = function(httpEndPoint, name, userCredentials) { + return this.sendGet(httpEndPoint + '/projection/' + name + '/result', userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.getPartitionResult = function(httpEndPoint, name, partitionId, userCredentials) { + return this.sendGet(httpEndPoint + '/projection/' + name + '/result?partition=' + partitionId, userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.getStatistics = function(httpEndPoint, name, userCredentials) { + return this.sendGet(httpEndPoint + '/projection/' + name + '/statistics', userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.getQuery = function(httpEndPoint, name, userCredentials) { + return this.sendGet(httpEndPoint + '/projection/' + name + '/query', userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.updateQuery = function(httpEndPoint, name, query, userCredentials) { + return this.sendPut(httpEndPoint + '/projection/' + name + '/query?type=JS', query, userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.delete = function(httpEndPoint, name, deleteEmittedStreams, userCredentials) { + return this.sendDelete(httpEndPoint + '/projection/' + name + '?deleteEmittedStreams=' + deleteEmittedStreams, userCredentials, HTTP_OK); +}; + +ProjectionsClient.prototype.request = function(method, _url, data, userCredentials, expectedCode) { + const options = url.parse(_url); + options.method = method; + if (userCredentials) { + options.auth = [userCredentials.username, userCredentials.password].join(':'); + } + var self = this; + return new Promise(function (resolve, reject) { + const timeout = setTimeout(function () { + reject(new Error(util.format('Request timed out for %s on %s', method, _url))) + }, self._operationTimeout); + const req = http.request(options, function (res) { + const hasExpectedCode = res.statusCode === expectedCode; + var result = ''; + res.setEncoding('utf8'); + res.on('data', function (chunk) { + result += chunk; + }); + res.on('end', function () { + if (hasExpectedCode) { + clearTimeout(timeout); + resolve(result); + } else { + clearTimeout(timeout); + reject(new ProjectionCommandFailedError( + res.statusCode, + util.format('Server returned %d (%s) for %s on %s', res.statusCode, res.statusMessage, method, _url) + )); + } + }); + }); + req.on('error', reject); + if (data) { + req.setHeader('Content-Type', 'application/json'); + req.write(data); + } + req.end(); + }); +}; + +function voidResult() {} + +ProjectionsClient.prototype.sendGet = function(_url, userCredentials, expectedCode) { + return this.request('GET', _url, null, userCredentials, expectedCode); +}; + +ProjectionsClient.prototype.sendPost = function(_url, data, userCredentials, expectedCode) { + return this.request('POST', _url, data, userCredentials, expectedCode) + .then(voidResult); +}; + +ProjectionsClient.prototype.sendPut = function(_url, data, userCredentials, expectedCode) { + return this.request('PUT', _url, data, userCredentials, expectedCode) + .then(voidResult); +}; + +ProjectionsClient.prototype.sendDelete = function(_url, data, userCredentials, expectedCode) { + return this.request('DELETE', _url, data, userCredentials, expectedCode) + .then(voidResult); +}; + +module.exports = ProjectionsClient; \ No newline at end of file diff --git a/src/projections/projectionsManager.js b/src/projections/projectionsManager.js new file mode 100644 index 0000000..f453d4b --- /dev/null +++ b/src/projections/projectionsManager.js @@ -0,0 +1,202 @@ +const ensure = require('../common/utils/ensure'); +const ProjectionsClient = require('./projectionsClient'); + +/** + * Creates a new instance of ProjectionsManager. + * @param {Logger} log Instance of Logger to use for logging. + * @param {string} httpEndPoint HTTP endpoint of an Event Store server. + * @param {number} operationTimeout Operation timeout in milliseconds. + * @constructor + */ +function ProjectionsManager(log, httpEndPoint, operationTimeout) { + ensure.notNull(log, "log"); + ensure.notNull(httpEndPoint, "httpEndPoint"); + this._client = new ProjectionsClient(log, operationTimeout); + this._httpEndPoint = httpEndPoint; +} + +/** + * Enables a projection. + * @param name The name of the projection. + * @param userCredentials Credentials for a user with permission to enable a projection. + * @returns {Promise} + */ +ProjectionsManager.prototype.enable = function(name, userCredentials) { + return this._client.enable(this._httpEndPoint, name, userCredentials); +}; + +/** + * Aborts and disables a projection without writing a checkpoint. + * @param name The name of the projection. + * @param userCredentials Credentials for a user with permission to disable a projection. + * @returns {Promise} + */ +ProjectionsManager.prototype.disable = function(name, userCredentials) { + return this._client.disable(this._httpEndPoint, name, userCredentials); +}; + +/** + * Disables a projection. + * @param name The name of the projection. + * @param userCredentials Credentials for a user with permission to disable a projection. + * @returns {Promise} + */ +ProjectionsManager.prototype.abort = function(name, userCredentials) { + return this._client.abort(this._httpEndPoint, name, userCredentials); +}; + +/** + * Creates a one-time query. + * @param query The JavaScript source code for the query. + * @param userCredentials Credentials for a user with permission to create a query. + * @returns {Promise} + */ +ProjectionsManager.prototype.createOneTime = function(query, userCredentials) { + return this._client.createOneTime(this._httpEndPoint, query, userCredentials); +}; + +/** + * Creates a one-time query. + * @param name A name for the query. + * @param query The JavaScript source code for the query. + * @param userCredentials Credentials for a user with permission to create a query. + * @returns {Promise} + */ +ProjectionsManager.prototype.createTransient = function(name, query, userCredentials) { + return this._client.createTransient(this._httpEndPoint, query, userCredentials); +}; + +/** + * Creates a one-time query. + * @param name The name of the projection. + * @param query The JavaScript source code for the query. + * @param trackEmittedStreams Whether the streams emitted by this projection should be tracked. + * @param userCredentials Credentials for a user with permission to create a query. + * @returns {Promise} + */ +ProjectionsManager.prototype.createContinuous = function(name, query, trackEmittedStreams, userCredentials) { + return this._client.createContinuous(this._httpEndPoint, name, query, trackEmittedStreams, userCredentials); +}; + +/** + * Lists the status of all projections. + * @param userCredentials Credentials for the operation. + * @returns {Promise} + */ +ProjectionsManager.prototype.listAll = function(userCredentials) { + return this._client.listAll(this._httpEndPoint, userCredentials); +}; + +/** + * Lists the status of all one-time projections. + * @param userCredentials Credentials for the operation. + * @returns {Promise} + */ +ProjectionsManager.prototype.listOneTime = function(userCredentials) { + return this._client.listOneTime(this._httpEndPoint, userCredentials); +}; + +/** + * Lists the status of all continuous projections. + * @param userCredentials Credentials for the operation. + * @returns {Promise} + */ +ProjectionsManager.prototype.listContinuous = function(userCredentials) { + return this._client.listContinuous(this._httpEndPoint, userCredentials); +}; + +/** + * Gets the status of a projection. + * @param name The name of the projection. + * @param userCredentials Credentials for the operation. + * @returns {Promise} String of JSON containing projection status. + */ +ProjectionsManager.prototype.getStatus = function(name, userCredentials) { + return this._client.getStatus(this._httpEndPoint, name, userCredentials); +}; + +/** + * Gets the state of a projection. + * @param name The name of the projection. + * @param userCredentials Credentials for the operation. + * @returns {Promise} String of JSON containing projection state. + */ +ProjectionsManager.prototype.getState = function(name, userCredentials) { + return this._client.getState(this._httpEndPoint, name, userCredentials); +}; + +/** + * Gets the state of a projection for a specified partition. + * @param name The name of the projection. + * @param partitionId The id of the partition. + * @param userCredentials Credentials for the operation. + * @returns {Promise} String of JSON containing projection state. + */ +ProjectionsManager.prototype.getPartitionState = function(name, partitionId, userCredentials) { + return this._client.getPartitionState(this._httpEndPoint, name, partitionId, userCredentials); +}; + +/** + * Gets the state of a projection. + * @param name The name of the projection. + * @param userCredentials Credentials for the operation. + * @returns {Promise} String of JSON containing projection state. + */ +ProjectionsManager.prototype.getResult = function(name, userCredentials) { + return this._client.getResult(this._httpEndPoint, name, userCredentials); +}; + +/** + * Gets the state of a projection for a specified partition. + * @param name The name of the projection. + * @param partitionId The id of the partition. + * @param userCredentials Credentials for the operation. + * @returns {Promise} String of JSON containing projection state. + */ +ProjectionsManager.prototype.getPartitionResult = function(name, partitionId, userCredentials) { + return this._client.getPartitionResult(this._httpEndPoint, name, partitionId, userCredentials); +}; + +/** + * Gets the statistics of a projection. + * @param name The name of the projection. + * @param userCredentials Credentials for the operation. + * @returns {Promise} String of JSON containing projection statistics. + */ +ProjectionsManager.prototype.getStatistics = function(name, userCredentials) { + return this._client.getStatistics(this._httpEndPoint, name, userCredentials); +}; + +/** + * Gets the status of a query. + * @param name The name of the query. + * @param userCredentials Credentials for the operation. + * @returns {Promise} String of JSON containing query status. + */ +ProjectionsManager.prototype.getQuery = function(name, userCredentials) { + return this._client.getQuery(this._httpEndPoint, name, userCredentials); +}; + +/** + * Updates the definition of a query. + * @param name The name of the query. + * @param query The JavaScript source code for the query. + * @param userCredentials Credentials for the operation. + * @returns {Promise} + */ +ProjectionsManager.prototype.updateQuery = function(name, query, userCredentials) { + return this._client.updateQuery(this._httpEndPoint, name, query, userCredentials); +}; + +/** + * Updates the definition of a query. + * @param name The name of the projection. + * @param deleteEmittedStreams Whether to delete the streams that were emitted by this projection. + * @param userCredentials Credentials for a user with permission to delete a projection. + * @returns {Promise} + */ +ProjectionsManager.prototype.delete = function(name, deleteEmittedStreams, userCredentials) { + return this._client.delete(this._httpEndPoint, name, deleteEmittedStreams, userCredentials); +}; + +module.exports = ProjectionsManager; \ No newline at end of file diff --git a/test/projections_test.js b/test/projections_test.js new file mode 100644 index 0000000..059a12c --- /dev/null +++ b/test/projections_test.js @@ -0,0 +1,47 @@ +const client = require('../src/client'); +const userCredentials = new client.UserCredentials('admin', 'changeit'); + +const log = new client.NoopLogger(); +const httpEndpoint = 'http://127.0.0.1:2113'; +const operationTimeout = 5000; + +const simpleProjection = "\ +fromStream('$stats-127.0.0.1:2113')\ + .when({\ + $init: function(){\ + return {\ + count: 0\ + }\ + },\ + $any: function(s,e){\ + s.count += 1;\ + }\ + })\ +"; + +module.exports = { + setUp: function(cb) { + this.projectionsManager = new client.ProjectionsManager(log, httpEndpoint, operationTimeout); + cb(); + }, + 'Create One Time Projection Happy Path': function(test) { + test.expect(1); + + this.projectionsManager.createOneTime(simpleProjection, userCredentials) + .then(function (result) { + test.equal(result, undefined); + test.done(); + }) + .catch(test.done); + }, + 'List All Happy Path': function(test) { + test.expect(1); + this.projectionsManager.listAll(userCredentials) + .then(function (projections) { + test.ok(projections.length > 0, "no projections"); + test.done(); + }) + .catch(test.done); + } + //TODO: other tests +}; \ No newline at end of file