Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
added elasticsearch support
Browse files Browse the repository at this point in the history
  • Loading branch information
glynnbird committed Feb 28, 2019
1 parent 06c6b4c commit 0524774
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 80 deletions.
39 changes: 29 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# couchwarehouse

*couchwarehouse* is a command-line tool that turns your [Apache CouchDB](http://couchdb.apache.org/) database(s) into a local data warehouse. It works by:
*couchwarehouse* is a command-line tool that turns your [Apache CouchDB](http://couchdb.apache.org/) database(s) into a local data warehouse. The target database can be either be [SQLite](https://www.sqlite.org/index.html), [PostgreSQL](https://www.postgresql.org/), [MySQL](https://www.mysql.com/) or [Elasticsearch](https://www.elastic.co/products/elasticsearch).

- discovering the "schema" of your CouchDB database.
- creating a new [SQLite](https://www.sqlite.org/index.html), [PostgreSQL](https://www.postgresql.org/) or [MySQL](https://www.mysql.com/) table to match the schema.
- downloading all the documents (except design documents) and inserting one row per document into the SQL database.
It works by:

- discovering the "schema" of your CouchDB database (for the relational databases).
- creating a new [SQLite](https://www.sqlite.org/index.html), [PostgreSQL](https://www.postgresql.org/) or [MySQL](https://www.mysql.com/) table to match the schema, or in the case of Elasticsearch simply moving the JSON over.
- downloading all the documents (except design documents) and inserting one row per document into the target database.
- continuously monitoring CouchDB for new documents, updates to existing documents and deletions.

![](img/couchwarehouse.png)

Once downloaded your database can be queried using SQL.
Once downloaded your database can be queried using SQL or the target database's API.

## Installation

Expand All @@ -19,7 +21,7 @@ Once downloaded your database can be queried using SQL.
npm install -g couchwarehouse
```

## Usage
## Usage with SQLite

By default, your CouchDB installation is expected to be on "http://localhost:5984". Override this with the `--url`/`-u` parameter and specify the database name with `--database`/`-db`:

Expand All @@ -42,7 +44,7 @@ After downloading is complete, *couchwarehouse* will continuously poll the sourc

Press "Ctrl-C" to exit.

## Accessing the data warehouse
## Accessing the SQLite data warehouse

In another terminal, simply run the `sqlite3` command-line tool (which may be pre-installed on your computer, otherwise [download here](https://www.sqlite.org/download.html)).

Expand All @@ -67,7 +69,7 @@ SQLite has an [extensive query language](https://www.sqlite.org/lang.html) inclu

N.B if your database name has a `-` character in it, it will be removed from the subsequent SQL table e.g "month-54" becomes "month54".

## Using with PostgreSQL instead of SQLite
## Using with PostgreSQL as the target database

The PostgreSQL connection details are gleaned from [environment variables](https://www.postgresql.org/docs/9.3/libpq-envars.html). If you're [running PostgreSQL locally](https://www.postgresql.org/docs/11/tutorial-install.html) without password protection, you need only worry about the `PGDATABASE` environment variable which defines the name of the database the `couchwarehouse` tables will be created. If left undefined, a database matching your current username will be assumed (e.g. `glynnb`). I had to create this database first:

Expand Down Expand Up @@ -96,7 +98,7 @@ glynnb=# select * from mydb limit 5;
(5 rows)
```

## Using with MySQL instead of SQLite
## Using with MySQL as the target database

The MySQL connection string is taken from the `MYSQLCONFIG` environment variable, or if absent `mysql://root:@localhost:3306/couchwarehouse` is used. connection details are gleaned from [environment variables]. You will need to create the `couchwarehouse` database first:

Expand Down Expand Up @@ -129,11 +131,28 @@ mysql> select * from mydb limit 5;
5 rows in set (0.00 sec)
```

## Using with Elasticsearch as the target database

The MySQL connection string is taken from the `ESCONFIG` environment variable, or if absent `http://localhost:9200` is used.

Run `couchwarehouse` specifyinhg the `--databaseType` parameter:

```sh
$ couchwarehouse --url https://U:[email protected] --db mydb --databaseType elasticsearch
```

You can then access your datawarehouse using the Elasticsearch API:

```
$ curl 'http://localhost:9200/couchwarehouse/_search?q=name:"York"'
{"took":3,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":6,"max_score":7.998925,"hits":[{"_index":"couchwarehouse","_type":"default","_id":"4562407","_score":7.998925,"_source":{"name":"York","latitude":39.9626,"longitude":-76.72774,"country":"US","population":43718,"timezone":"America/New_York"}},{"_index":"couchwarehouse","_type":"default","_id":"2633352","_score":7.998925,"_source":{"name":"York","latitude":53.95763,"longitude":-1.08271,"country":"GB","population":144202,"timezone":"Europe/London"}},{"_index":"couchwarehouse","_type":"default","_id":"6091104","_score":5.9267497,"_source":{"name":"North York","latitude":43.76681,"longitude":-79.4163,"country":"CA","population":636000,"timezone":"America/Toronto"}},{"_index":"couchwarehouse","_type":"default","_id":"7870925","_score":5.9267497,"_source":{"name":"East York","latitude":43.69053,"longitude":-79.32794,"country":"CA","population":115365,"timezone":"America/Toronto"}},{"_index":"couchwarehouse","_type":"default","_id":"5128581","_score":5.283532,"_source":{"name":"New York City","latitude":40.71427,"longitude":-74.00597,"country":"US","population":8175133,"timezone":"America/New_York"}},{"_index":"couchwarehouse","_type":"default","_id":"5106292","_score":4.734778,"_source":{"name":"West New York","latitude":40.78788,"longitude":-74.01431,"country":"US","population":49708,"timezone":"America/New_York"}}]}
```

## Command-line parameter reference

- `--url`/`-u` - the URL of the CouchDB instance e.g. `http://localhost:5984`
- `--database`/`--db`/`-d` - the name of the CouchDB database to work with
= `--databaseType`/`-dt` - the type of database - `sqlite` or z`postgresql` (default: sqlite)
= `--databaseType`/`-dt` - the type of database - `sqlite`, `mysql`,`postgresql` or `elasticsearch` (default: sqlite)
- `--verbose` - whether to show progress on the terminal (default: true)
- `--reset`/`-r` - reset the data. Delete existing data and start from scratch (default: false)
- `--transform`/`-t` - transform each document with a supplied JavaScript function (default: null)
Expand Down
2 changes: 1 addition & 1 deletion bin/couchwarehouse.bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const db = process.env.COUCH_DATABASE
const args = require('yargs')
.option('url', { alias: 'u', describe: 'CouchDB URL', default: url })
.option('database', { alias: ['db', 'd'], describe: 'CouchDB database name', demandOption: !db, default: db })
.option('databaseType', { alias: ['dt'], describe: 'SQL database type', default: 'sqlite' })
.option('databaseType', { alias: ['dt'], describe: 'Target database type (postgresql,mysql,sqlite,elasticsearch)', default: 'sqlite' })
.option('verbose', { describe: 'Show instructions and progress in the output', default: true })
.option('reset', { alias: 'r', describe: 'Ignore previously downloaded data and start again', default: false })
.option('transform', { alias: 't', describe: 'Path to a JavaScript transformation function', default: process.env.COUCH_TRANSFORM ? process.env.COUCH_TRANSFORM : null })
Expand Down
Binary file modified img/couchwarehouse.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 6 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const loadDatabaseDriver = (opts) => {
case 'mysql':
sqldb = require('./lib/mysql.js')
break
case 'elasticsearch':
sqldb = require('./lib/elasticsearch.js')
break
case 'sqlite':
default:
sqldb = require('./lib/sqlite.js')
Expand Down Expand Up @@ -110,6 +113,9 @@ const spoolChanges = async (opts, theSchema, maxChange) => {
bar.tick(b.length)
}

// write checkpoint
await sqldb.writeCheckpoint(opts.database, lastSeq)

// call the done callback if provided
if (typeof done === 'function') {
done()
Expand Down Expand Up @@ -218,7 +224,6 @@ const start = async (opts) => {
debug('Spooling changes')
if (opts.verbose) {
opts.usableDbName = util.calculateUsableDbName(opts, opts.database, null)
console.log(opts.database, opts.usableDbName)
sqldb.message(opts)
}
const lastSeq = await spoolChanges(opts, theSchema, maxChange)
Expand Down
92 changes: 92 additions & 0 deletions lib/elasticsearch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
const elasticsearch = require('elasticsearch')
let db

const initialise = async (reset) => {
const config = process.env.ESCONFIG || 'http://localhost:9200'
db = new elasticsearch.Client({
hosts: [config]
})
}

const generateCreateTableSQL = (opts, docType, dbName, schema, reset) => {
return []
}

// insert an array of changes into the database
const insertBulk = async (opts, createSQL, dbName, theSchema, batch) => {
const actions = []

batch.forEach(async b => {
// ignore design docs
if (!b.id.match(/^_design/)) {
// get the schema we're working with
const docType = opts.split ? b.doc[opts.split] : 'default'

// if this is a deletion
if (b.deleted) {
// delete from Elasticsearch
actions.push({ delete: { _index: 'couchwarehouse', _type: docType, _id: b.doc._id } })
} else {
actions.push({ index: { _index: 'couchwarehouse', _type: docType, _id: b.doc._id } })
delete b.doc._id
delete b.doc._rev
delete b.doc._attachments
actions.push(b.doc)
}
}
})

if (actions.length > 0) {
return db.bulk({ body: actions })
} else {
return null
}
}

const query = async (sql, substitutions) => {
return null
}

// write a checkpoint to keep track of where we got to
// with each table
const writeCheckpoint = async (tablename, seq) => {
const actions = []
actions.push({
index: {
_index: 'couchwarehousemeta',
_type: tablename,
_id: 'checkpoint'
}
})
actions.push({ seq: seq })
return db.bulk({ body: actions })
}

const getCheckpoint = async (tablename) => {
try {
const doc = await db.get({
index: 'couchwarehousemeta',
type: tablename,
id: 'checkpoint'
})
return (doc && doc._source && doc._source.seq) ? doc._source.seq : null
} catch (e) {
return null
}
}

const message = (opts) => {
console.log('Spooling data into Elasticsearch')
console.log('Use the API to access the data: https://www.elastic.co/guide/en/elasticsearch/reference/5.5/_the_search_api.html')
console.log('p.s Press ctrl-C to stop monitoring for further changes')
}

module.exports = {
initialise,
generateCreateTableSQL,
insertBulk,
query: query,
writeCheckpoint: writeCheckpoint,
getCheckpoint: getCheckpoint,
message: message
}
Loading

0 comments on commit 0524774

Please sign in to comment.