src/Pipeline/Pipe-api.test.js
import {Validator} from "./Validator";
import {Pipeline} from "./Pipeline";
import {basicCollection} from "../../fixtures/PropertiesModel.schemas";
import {default as data} from "../../fixtures/pipes-test.data";
import {default as _pipesOrSchemas} from "../../fixtures/pipes-or-schema";
const _schemaDef = {
"schemas": [
{
"$id": "http://api-hero.webfreshener.com/v1/namespace.json#",
"type": "object",
"required": [
"basePath"
],
"properties": {
"basePath": {
"type": "string",
},
"servers": {
"type": "object"
},
"options": {
"type": "object"
},
"operations": {
"$ref": "#/definitions/Operations"
}
},
"definitions": {
"Operations": {
"type": "array",
"items": {
"$ref": "#/definitions/OperationItem"
}
},
"OperationItem": {
"type": "object",
"required": [
"operationId",
"method",
"path"
],
"additionalProperties": false,
"properties": {
"operationId": {
"type": "string"
},
"method": {
"type": "string",
"enum": [
"delete",
"get",
"head",
"options",
"patch",
"post",
"put"
]
},
"path": {
"type": "string"
},
"request": {
"type": "object"
},
"responses": {
"type": "object"
}
}
}
}
},
{
"$id": "http://api-hero.webfreshener.com/v1/namespace/operations.json#",
// "$schema": "http://api-hero.webfreshener.com/v1/namespace.json#",
"allOf": [
{
"required": [
"elementClass",
"target"
],
"properties": {
"elementClass": {
"type": "object"
},
"target": {
"type": "object"
}
}
},
{
"$ref": "http://api-hero.webfreshener.com/v1/namespace.json#/definitions/Operations"
}
]
}
],
"use": "http://api-hero.webfreshener.com/v1/namespace/operations.json#"
};
describe("Pipeline API Tests", () => {
let _p;
beforeEach(() => {
_p = new Pipeline(..._pipesOrSchemas);
});
describe("Pipeline Schema", () => {
it("should accept schema", () => {
expect(
() => (new Pipeline(basicCollection))
).not.toThrow();
});
it("should accept schema config", () => {
expect(
() => (new Pipeline(_schemaDef))
).not.toThrow();
});
});
describe("Pipeline exec", () => {
it("should not be an observable", (done) => {
const _p = new Pipeline(..._pipesOrSchemas);
_p.subscribe({
next: () => {
done("should not be observable if called with exec");
},
error: () => {
done("should not be observable if called with exec");
}
});
expect(_p.exec(data).length).toEqual(3);
expect(Object.keys(_p.tap()).length).toEqual(0);
setTimeout(() => done(), 10);
});
it("should not be a promise", () => {
expect(_p.exec(data).then).toBeUndefined();
});
it("should throw", () => {
const _p = new Pipeline(..._pipesOrSchemas);
try {
_p.exec("invalid value");
} catch (e) {
expect(e.error[0].message).toEqual("must be array");
}
});
});
it("should send error notification if a pipeline returns false", (done) => {
const _p = new Pipeline(
...[
..._pipesOrSchemas,
{
exec: () => false,
}
]
);
const _sub = _p.subscribe({
next: () => {
_sub.unsubscribe();
done("pipeline should not have sent next notification");
},
error: (e) => {
_sub.unsubscribe();
expect(e.error[0].message).toEqual("must be array");
expect(JSON.stringify(e.data)).toEqual(JSON.stringify(data[0]));
done();
},
});
_p.write(data[0]);
});
it("should provide errors", (done) => {
const _p = new Pipeline(
{type: "boolean"});
const _sub = _p.subscribe({
next: () => {
_sub.unsubscribe();
done("pipeline should have errored");
},
error: (e) => {
_sub.unsubscribe();
expect(e.error !== void 0).toBe(true);
done();
}
});
_p.write(data[0]);
});
it("should split pipeline", (done) => {
const _config = [
{
exec: (d) => {
return d.map((m) => Object.assign(m, {name: `${m.name} RENAMED`}))
},
},
{
exec: (d) => d.map((m) => Object.assign(m, {age: 99})),
},
];
// const _cb = jest.fn();
let _cnt = 0;
_p = new Pipeline({
exec: (d) => d
});
_p.subscribe({
error: done,
});
const _split = _p.split(_config);
expect(_split.length).toEqual(2);
_split.forEach((pipe) => {
const _sub = pipe.subscribe({
next: () => {
_cnt++;
_sub.unsubscribe();
},
error: (e) => {
_sub.unsubscribe();
throw e;
}
});
});
_p.write(data);
setTimeout(() => {
// expect(_cb).toHaveBeenCalledTimes(2);
expect(_cnt).toEqual(2);
expect(_split[0].tap()[0].name.match(/.*\sRENAMED+$/)).toBeTruthy();
expect(_split[1].tap()[0].age).toEqual(99);
done();
}, 50);
});
it("should exec multiple pipes inline", (done) => {
const _p1 = new Pipeline(
basicCollection,
(d) => d.map((m) => Object.assign(m, {name: `${m.name} RENAMED`}))
);
const _p2 = new Pipeline(
basicCollection,
(d) => d.map((m) => Object.assign(m, {age: 99}))
);
const _inline = _p.pipe(_p1, _p2);
_inline.subscribe({
next: (d) => {
expect(d.length).toEqual(data.length);
expect(d[data.length - 1].name.match(/.*\sRENAMED+$/)).toBeTruthy();
expect(d[data.length - 1].age).toEqual(99);
expect(1).toBeTruthy();
},
error: done,
});
_inline.write(data);
setTimeout(() => {
expect(_inline.tap().length).toEqual(data.length);
expect(_inline.tap()[data.length - 1].name.match(/.*\sRENAMED+$/)).toBeTruthy();
expect(_inline.tap()[data.length - 1].age).toEqual(99);
expect(1).toBeTruthy();
done();
_inline.close();
}, 50);
});
test("pipe", (done) => {
const _pipe = new Pipeline(
basicCollection,
(d) => d.map((m) => Object.assign(m, {name: `${m.name} RENAMED`}))
);
_pipe.pipe((d) => d).subscribe({
next: () => done(),
error: done,
});
_pipe.write(data);
});
test("yield", () => {
const _pOS = [
{
exec: () => "foo",
},
new Pipeline({
exec: () => "bar",
}),
{
exec: () => "baz",
},
];
const _ = (new Pipeline(..._pOS)).yield(data);
expect(_.next().value).toBe("foo");
expect(_.next().value).toBe("bar");
expect(_.next().value).toBe("baz");
expect(_.next().done).toBe(true);
});
it("pipeline should pipeline", (done) => {
const _tx = new Pipeline();
_tx.subscribe({
next: (d) => {
expect(d).toEqual(data);
done();
},
error: (e) => {
done(e);
}
});
_tx.write(data);
});
describe("throttle/unthrottle", () => {
test("throttle", (done) => {
let _cnt = 0;
const rate = 150;
const _sub = _p.throttle(rate).subscribe(() => _cnt++);
data.forEach((d) => {
_p.write([d]);
});
expect(_p.errors).toEqual(null);
const _ivl = setInterval(() => {
clearInterval(_ivl);
if (_cnt !== data.length) {
done(`Throttled Pipeline did not send all data.\nExpected: ${data.length}\nReceived: ${_cnt}`);
}
expect(_p.tap().length).toEqual(1);
expect(_p.tap()[0].hasOwnProperty("name")).toBeTruthy();
expect(_p.tap()[0].name).toEqual(data[data.length - 1].name);
_sub.unsubscribe();
_p.unthrottle();
done();
}, (data.length * rate) + 20);
});
test("throttle reset", (done) => {
let _cnt = 0;
const rate = 150;
const _sub = _p.throttle(rate).subscribe(() => {
_p.throttle(0);
_cnt++
});
data.forEach((d) => {
_p.write([d]);
});
expect(_p.errors).toEqual(null);
const _ivl = setInterval(() => {
clearInterval(_ivl);
if (_cnt !== data.length) {
done(`Pipeline did not release all data after throttle reset.\nExpected: ${data.length}\nReceived: ${_cnt}`);
}
_sub.unsubscribe();
done();
}, rate + 20);
});
test("throttle reset discard cache", (done) => {
let _cnt = 0;
const rate = 150;
const _sub = _p.throttle(rate).subscribe(() => {
_p.throttle(-1);
_cnt++;
});
data.forEach((d) => {
_p.write([d]);
});
expect(_p.errors).toEqual(null);
const _ivl = setInterval(() => {
clearInterval(_ivl);
if (_cnt !== 1) {
done(`Pipeline did not discard cached data after throttle reset.\nExpected: 1\nReceived: ${_cnt}`);
}
_sub.unsubscribe();
done();
}, rate * data.length);
});
test("unthrottle", (done) => {
const _pipe = new Pipeline(..._pipesOrSchemas);
let _cnt = 0;
const rate = 150;
const _sub = _pipe.throttle(rate).subscribe({
next: (d) => {
_pipe.unthrottle();
_cnt++;
},
error: done,
});
data.forEach((d) => {
_pipe.write([d]);
});
const _ivl = setInterval(() => {
clearInterval(_ivl);
expect(_cnt).toEqual(6);
done();
}, rate + 10);
});
test("unthrottle discard cache", (done) => {
const _pipe = new Pipeline(..._pipesOrSchemas);
let _cnt = 0;
const rate = 150;
const _sub = _pipe.throttle(rate).subscribe({
next: () => {
_pipe.unthrottle(true);
_cnt++;
},
error: done,
});
data.forEach((d) => {
_pipe.write([d]);
});
const _ivl = setInterval(() => {
clearInterval(_ivl);
expect(_cnt).toEqual(1);
_sub.unsubscribe();
done();
}, rate + 10);
});
});
test("sample", () => {
const _cb = jest.fn();
_p.sample(3).subscribe({
next: () => _cb(),
error: console.log,
});
data.slice(0, 4).forEach((m) => {
_p.write([m]);
});
expect(_cb).toHaveBeenCalledTimes(1);
});
test("clone", () => {
let _cnt = 0;
const _h = () => _cnt++;
const _sub1 = _p.subscribe(_h);
const _clone = _p.clone();
const _sub2 = _clone.subscribe(_h);
_clone.write(data);
expect(_cnt).toEqual(2);
_cnt = 0;
_sub1.unsubscribe();
_sub2.unsubscribe();
});
describe("link / unlink", () => {
beforeAll(() => {
});
it("should link pipeline", () => {
});
it("should unlink pipeline", () => {
});
});
test("should link and unlink pipes", () => {
const _cb = jest.fn();
const _TxValidator = new Validator({schemas: [basicCollection]});
const _link = new Pipeline(_TxValidator, {schemas: [basicCollection]});
_p.link(_link, (d) => {
_cb();
return d;
});
_p.write(data);
expect(_p.errors).toEqual(null);
expect(_cb).toHaveBeenCalledTimes(1);
// we capture state for comparison
const _state = `${_p}`;
expect(`${_link}`).toEqual(`${_state}`);
expect(`${_link}` === `${_p}`).toBe(true);
_p.unlink(_link);
// this will add an item to _p but not to _link
_p.write([
..._p.tap(),
{
name: "Added Item",
active: true,
age: 100,
}
]);
expect(_p.errors).toEqual(null);
// we expect to discover no further executions and the state to be unchanged
expect(_cb).toHaveBeenCalledTimes(1);
expect(`${_link}`).toEqual(`${_state}`);
expect(`${_link}` === `${_p}`).toBe(false);
});
it("should close", () => {
const _cb = jest.fn();
const _sub = _p.subscribe({
next: () => {
// will close on first invocation
_p.close();
_cb();
},
error: (e) => {
_sub.unsubscribe();
throw e;
},
});
data.forEach((d) => {
_p.write(Array.isArray(_p.tap()) ? [..._p.tap(), d] : [d]);
});
expect(_p.errors).toEqual(null);
expect(_p.tap().length).toEqual(1);
expect(_cb).toHaveBeenCalledTimes(1);
expect(_p.writable).toBe(false);
_sub.unsubscribe();
});
it("should merge multiple pipes into a single output", (done) => {
const _vo = {
schema: _pipesOrSchemas[0].schema,
};
const _p1 = new Pipeline({
schema: _vo.schema,
exec: (d) => d.map((m) => Object.assign(m, {name: `${m.name} RENAMED`})),
});
const _p2 = new Pipeline({
schema: _vo.schema,
exec: (d) => d.map((m) => Object.assign(m, {age: 99}))
}
);
const _merged = _p.merge([_p1, _p2], {
schema: _vo.schema,
exec: (d) => d.map(
(m) => Object.assign(m, {active: false})
)
});
let _cnt = 0;
_merged.subscribe({
next: (d) => {
if (!_cnt) {
_cnt++;
expect(_merged.tap()[0].name.match(/.*\sRENAMED+$/)).toBeTruthy();
expect(_merged.tap()[data.length - 1].name.match(/.*\sRENAMED+$/)).toBeTruthy();
_p2.write(data);
} else {
expect(_merged.tap().length).toEqual(data.length);
expect(_merged.tap()[0].age).toEqual(99);
expect(_merged.tap()[data.length - 1].age).toEqual(99);
done();
}
}
});
_p1.write(data);
});
});