Streaming CouchDB data
I'm a confessing fan of CouchDB, stream programming and the official CouchDB NodeJS library. Nano supports returning data as NodeJS Stream, so you can pipe it away. Most examples use file streams or process.stdout
, while my goal was to process individual documents that are part of the stream
You can't walk into the same stream a second time
This old Buddhist saying holds true for NodeJS streams too. So any processing needs to happen in the chain of the stream. Let's start with the simple example of reading all documents from a couchDB:
const Nano = require("nano");
const nano = Nano(couchDBURL);
nano.listAsStream({ include_docs: true }).pipe(process.stdout);
This little snippet will read out all documents in your couchDB. You need to supply the couchDBURL
value, e.g. http://localhost:5984/test
. On a closer look, we see that the data returned arrives in continious buffers that don't match JSON document boundaries, so processing one document after the other needs extra work.
A blog entry in the StrongLoop blog provides the first clue what to do. To process CouchDB stream data we need both a Transform stream to chop incoming data into line by line and a writable stream for our results.
Our code, finally will look like this:
const Nano = require("nano");
const { Writable, Transform } = require("stream");
const streamOneDb = (couchDBURL, resultCallback) => {
const nano = Nano(couchDBURL);
nano
.listAsStream({ include_docs: true })
.on("error", (e) => console.error("error", e))
.pipe(lineSplitter())
.pipe(jsonMaker())
.pipe(documentWriter(resultCallback));
};
Let's have a closer look at the new functions, the first two implement transform
, the last one writable
:
lineSplitter
, as the name implies, cuts the buffer into separate lines for processing. As far as I could tell, CouchDB documents always returned on one linejsonMaker
, extracts the documents and discards the wrapper with document count that surrounds themdocumentWriter
, writing out the JSON object using a callback
Splitting lines
The little special: a chunk might end in the middle of a line, so we keep the last line around for the next iteration until it gets flushed out. The callback()
triggers the next step in the pipe.
const lineSplitter = () =>
new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
let raw = Buffer.from(chunk, encoding).toString();
if (this._leftOver) {
raw = this._leftOver + raw;
}
let lines = raw.split("\n");
this._leftOver = lines.splice(lines.length - 1, 1)[0];
for (var i in lines) {
this.push(lines[i]);
}
callback();
},
flush(callback) {
if (this._leftOver) {
this.push(this._leftOver);
}
this._leftOver = null;
callback();
},
});
Nota bene: the call to lineSplitter()
returns a new instance, not executes the split, since each instance can only be used for one stream at all. I tried to just define the function and found NodeJS terminating without a trace when the same instance was used on a second stream.
Extracting documents
You might want to adjust this function when you are interested in other data.
const jsonMaker = () =>
new Transform({
objectMode: true,
transform(rawLine, encoding, callback) {
// remove the comma at the end of the line - CouchDB sent an array
let line = rawLine.toString().replace(/,$/m, "").trim();
if (line.startsWith('{"id":') && line.endsWith("}")) {
try {
let j = JSON.parse(line);
// We only want the document
if (j.doc) {
this.push(JSON.stringify(j.doc));
}
} catch (e) {
console.error(e.message);
}
}
callback();
},
});
Nota bene: Streams process buffers or strings, not JSON Objects, so we need to stringify/parse between the modules
Writing out
In the last step we take the document and process it with whatever we need to do.
const documentWriter = (resultCallback) =>
new Writable({
write(chunk, encoding, callback) {
let json = JSON.parse(Buffer.from(chunk, encoding).toString());
// Process the code
resultCallback(json);
// Tell that we are done
callback();
},
});
So far we have tried it on databases with 500k small documents and databases with documents exceeding 1MB JSON. Works like a charm.
I wonder if that would make a good addition to the nano
library? The source is available as gist
As usual YMMV
Posted by Stephan H Wissel on 16 October 2021 | Comments (1) | categories: CouchDB NodeJS