How to do postgresql bulk insert with node-pg-copy-streams
Background
This article is intended for NodeJS / Javascript based application which uses Postgresql as the persistence layer.
Best way to do bulk insert into postgresql is to use copy command. High level syntax:
COPY table_name [ ( column_name [, ...] ) ]
FROM { 'filename' | PROGRAM 'command' | STDIN }
[ [ WITH ] ( option [, ...] ) ]
From a javascript application layer, we could send the data through STDIN in different formats (CSV, TSV etc).
pg-copy-streams is an awesome library from the author of PostgreSQL client for node.js - https://node-postgres.com. This library is lightweight and stream based. pg-copy-streams provides a layer on top of postgresql COPY command.
Before jumping in, we may need a better understanding of streams and how to get this working with NodeJS — Good place to start is the stream-handbook
Implementing bulk insert of dynamic objects array
Most of the time, we construct objects dynamically and would like to persist them into the database. In such cases, there are few areas that plays a key role.
Explanation:
- Instead of writing all the objects to the readable stream., have the write logic inside rs._read (Readable._read). _read will be involved only when some one reads the stream
- To mark the end of our data stream to database, in our case, Readable stream, push null to the readable stream — rs.push(null)
- Don’t close or release the database connection before the copy stream ends. If we do, data will not be persisted. The right way to do this is by attaching to the stream.on(‘end’) event.
- Here in the example, data is sent in TSV format. You could use CSV as well. For this, we may need to use:
COPY employee (name,age,salary) FROM STDIN CSV
Further optimization
For brevity, the data sent to this bulkInsert method is a straight forward array object. Better approach would be to send a stream and avoid the array creation completely. This way, the complete flow — from where the data is created till the bulk insertion everything becomes stream based. The above method may not be optimal when the array length is too high.