src/Pipeline/Pipe-exec.js
import {Pipeline} from "./Pipeline";
import {default as data} from "../../fixtures/pipes-test.data";
describe("Pipeline Exec Tests", () => {
it("should intake and output data", (done) => {
const _sub = _p.subscribe({
next: (d) => {
_sub.unsubscribe();
expect(`${JSON.stringify(d)}`).toEqual(JSON.stringify(_p.tap()));
done();
},
error: (e) => {
_sub.unsubscribe();
done(e);
},
});
_p.write(data);
});
it("should transform data with callback", (done) => {
const _sub = _p.subscribe({
next: (d) => {
_sub.unsubscribe();
expect(d.length).toEqual(3);
expect(_p.tap().length).toEqual(3);
done();
},
error: (e) => {
_sub.unsubscribe();
done(e);
}
});
_p.write(data);
});
it("should not exec until called upon", (done) => {
const _cb = jest.fn();
let _cnt = 0;
const _p = new Pipeline(
async (d) => {
return Promise.resolve(d);
}
);
const _ivl = setInterval(() => {
_p.write("ok");
}, 10);
_p.subscribe({
next: () => {
// expect(_cb).toBeCalledTimes(1);
if (++_cnt === 2) {
clearInterval(_ivl);
done();
}
console.log(_cnt);
},
error: (e) => {
done(e);
}
});
});
it("should stop if a pipeline returns false", (done) => {
const _p = new Pipeline(
...[
..._pipesOrSchemas,
{
exec: () => false,
}
]
);
const _sub = _p.subscribe({
next: () => {
_sub.unsubscribe();
done("pipeline should not have sent next notification");
},
error: (e) => {
_sub.unsubscribe();
expect(e.error[0].message).toEqual("should be array");
expect(JSON.stringify(e.data)).toEqual(JSON.stringify(data[0]));
done();
},
});
_p.write(data[0]);
setTimeout(done, 200);
});
it("should provide errors", (done) => {
const _p = new Pipeline(() => "an error message");
const _sub = _p.subscribe({
next: () => {
_sub.unsubscribe();
done("pipeline should have errored");
},
error: (e) => {
_sub.unsubscribe();
expect(e.error !== void 0).toBe(true);
done();
},
complete: () => {
done("should not have completed");
}
});
_p.write(data[0]);
});
it("should send error if a pipeline returns string", (done) => {
const _eMsg = "an important error message for you";
const _p = new Pipeline(
_pipesOrSchemas,
{
exec: () => _eMsg,
}
);
const _sub = _p.subscribe({
next: () => {
_sub.unsubscribe();
done("pipeline should not have sent next notification");
},
error: (e) => {
_sub.unsubscribe();
expect(e.error).toEqual(_eMsg);
done();
},
});
_p.write(data[0]);
});
});