Skip to content

Commit 58a8ab8

Browse files
committed
Introduce postgres transport. This is pretty much a read only version so far.
1 parent 5c2e10e commit 58a8ab8

File tree

12 files changed

+451
-26
lines changed

12 files changed

+451
-26
lines changed

forklift_etl.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ Gem::Specification.new do |s|
2828
s.add_development_dependency 'rake'
2929
s.add_development_dependency 'rspec'
3030
s.add_development_dependency 'email_spec'
31+
s.add_development_dependency 'pg'
3132
end

lib/forklift/transports/pg.rb

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
module Forklift
2+
module Connection
3+
class Pg < Forklift::Base::Connection
4+
def initialize(config, forklift)
5+
begin
6+
require 'pg'
7+
rescue LoadError
8+
raise "To use the postgres connection you must add 'pg' to your Gemfile"
9+
end
10+
super(config, forklift)
11+
end
12+
13+
def connect
14+
@client ||= PG::Connection.new(config)
15+
end
16+
17+
def disconnect
18+
client.close
19+
end
20+
21+
def default_matcher
22+
'updated_at'
23+
end
24+
25+
def drop!(table)
26+
q("DROP TABLE IF EXISTS #{quote_ident(table)}")
27+
end
28+
29+
def rename(table, new_table)
30+
q("ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(new_table)}")
31+
end
32+
33+
def read(query, database=current_database, looping=true, limit=forklift.config[:batch_size], offset=0)
34+
page = 1
35+
loop do
36+
result = q(paginate_query(query, page, limit))
37+
38+
block_given? ? yield(result) : (return result)
39+
return result if result.num_tuples < limit || !looping
40+
page += 1
41+
end
42+
end
43+
44+
def write(rows, table, to_update=true, database=current_database, primary_key='id', lazy=true, crash_on_extral_col=false)
45+
if tables.include? table
46+
ensure_row_types(rows, table, database)
47+
elsif lazy && rows.length > 0
48+
lazy_table_create(table, rows, database, primary_key)
49+
end
50+
51+
insert_values = []
52+
delete_keys = []
53+
rows.map do |row|
54+
if to_update && !row[primary_key].nil?
55+
delete_keys << row[primary_key]
56+
else
57+
insert_values << safe_values(columns, row)
58+
end
59+
end
60+
61+
unless delete_keys.empty?
62+
q(%{DELETE FROM #{quote_ident(table)} WHERE #{quote_ident(primary_key)} IN (#{delete_keys.join(',')})})
63+
end
64+
65+
q(%{INSERT INTO #{quote_ident(table)} (#{safe_columns(columns)}) VALUES #{insert_values.join(',')}})
66+
forklift.logger.log "wrote #{rows.length} rows to `#{database}`.`#{table}`"
67+
end
68+
69+
# @todo
70+
def lazy_table_create(table, data, database=current_database, primary_key='id', matcher=default_matcher)
71+
raise NotImplementedError.new
72+
end
73+
74+
# @todo
75+
def sql_type(v)
76+
raise NotImplementedError.new
77+
end
78+
79+
def read_since(table, since, matcher=default_matcher, database=current_database, limit=forklift.config[:batch_size])
80+
query = %{SELECT * FROM #{quote_ident(table)} WHERE #{quote_ident(matcher)} >= #{client.escape_literal(since)} ORDER BY #{quote_ident(matcher)} ASC}
81+
self.read(query, database, true, limit) do |rows|
82+
if block_given?
83+
yield rows
84+
else
85+
return rows
86+
end
87+
end
88+
end
89+
90+
def max_timestamp(table, matcher=default_matcher)
91+
row = q(%{SELECT max(#{quote_ident(matcher)}) AS 'matcher' FROM #{quote_ident(table)}}).first
92+
(row && row['matcher']) || Time.at(0)
93+
end
94+
95+
def tables
96+
table_list = []
97+
read(%{SELECT table_name AS "table_name" FROM information_schema.tables WHERE table_schema = 'public'}) do |result|
98+
table_list << result.map{|r| r['table_name']}
99+
end
100+
table_list.flatten.compact
101+
end
102+
103+
def current_database
104+
client.db
105+
end
106+
107+
def count(table)
108+
q(%{SELECT count(1) AS "count" FROM #{quote_ident(table)}})[0]['count'].to_i
109+
end
110+
111+
def truncate!(table)
112+
q("TRUNCATE TABLE #{quote_ident(table)}")
113+
end
114+
115+
def truncate(table)
116+
begin
117+
self.truncate!(table)
118+
rescue Exception => e
119+
forklift.logger.debug e
120+
end
121+
end
122+
123+
def columns(table, database=current_database, return_types=false)
124+
columns = {}
125+
read(%{SELECT column_name, data_type, character_maximum_length FROM "information_schema"."columns" WHERE table_name='#{table}'}) do |rows|
126+
rows.each do |row|
127+
type = case row['data_type']
128+
when 'character varying' then "varchar(#{row['character_maximum_length']})"
129+
else row['data_type']
130+
end
131+
columns[row['column_name']] = type
132+
end
133+
end
134+
return_types ? columns : columns.keys
135+
end
136+
137+
def dump(file, options=[])
138+
end
139+
140+
def exec_script(path)
141+
end
142+
143+
def q(query, options={})
144+
forklift.logger.debug "\tSQL[#{config[:database]}]: #{query}"
145+
client.exec(query)
146+
end
147+
148+
private
149+
def ensure_row_types(rows, table, database=current_database)
150+
columns = columns(table, database)
151+
rows.each do |row|
152+
row.each do |column, value|
153+
unless columns.include?(column)
154+
q(%{ALTER TABLE #{quote_ident(table)} ADD #{quote_ident(column)} #{sql_type(value)} NULL DEFAULT NULL})
155+
columns = columns(table, database)
156+
end
157+
end
158+
end
159+
end
160+
161+
def paginate_query(query, page, page_size)
162+
offset = (page-1) * page_size
163+
[query, "LIMIT #{page_size} OFFSET #{offset}"].join(' ')
164+
end
165+
166+
def quote_ident(table)
167+
PG::Connection.quote_ident(table)
168+
end
169+
end
170+
end
171+
end
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
:dbname: forklift_test_destination
2+
:host: 127.0.0.1
3+
:port: 5432
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
:dbname: forklift_test_source_a
2+
:host: 127.0.0.1
3+
:port: 5432
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
:dbname: forklift_test_source_b
2+
:host: 127.0.0.1
3+
:port: 5432
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
:dbname: forklift_test_working
2+
:host: 127.0.0.1
3+
:port: 5432

spec/integration/multi_transport_spec.rb

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
11
require 'spec_helper'
22

33
describe 'multiple trasport types' do
4+
describe 'elasticsearch => mysql' do
5+
before(:each) do
6+
SpecSeeds.setup_mysql
7+
SpecSeeds.setup_elasticsearch
8+
end
49

5-
before(:each) do
6-
SpecSeeds.setup_mysql
7-
SpecSeeds.setup_elasticsearch
8-
end
9-
10-
describe 'elasticsearch => mysql' do
1110
it 'can load in a full query' do
1211
table = 'es_import'
1312
index = 'forklift_test'
1413
query = { query: { match_all: {} } }
15-
plan = SpecPlan.new
16-
plan.do! {
17-
source = plan.connections[:elasticsearch][:forklift_test]
18-
destination = plan.connections[:mysql][:forklift_test_destination]
19-
source.read(index, query) {|data| destination.write(data, table) }
20-
}
21-
plan.disconnect!
22-
23-
destination = SpecClient.mysql('forklift_test_destination')
24-
rows = destination.query("select count(1) as 'count' from es_import").first["count"]
25-
expect(rows).to eql 5
14+
begin
15+
plan = SpecPlan.new
16+
plan.do! {
17+
source = plan.connections[:elasticsearch][:forklift_test]
18+
destination = plan.connections[:mysql][:forklift_test_destination]
19+
source.read(index, query) {|data| destination.write(data, table) }
20+
expect(destination.count('es_import')).to eql(5)
21+
}
22+
ensure
23+
plan.disconnect!
24+
end
2625
end
2726

2827
it 'can load in a partial query' do
@@ -54,13 +53,13 @@
5453
plan.do! {
5554
source = plan.connections[:elasticsearch][:forklift_test]
5655
destination = plan.connections[:mysql][:forklift_test_destination]
57-
source.read(index, query) {|data|
56+
source.read(index, query) {|data|
5857
clean_data = []
5958
data.each do |row|
6059
row[:viewed_at] = Time.at(row[:viewed_at])
6160
clean_data << row
6261
end
63-
destination.write(clean_data, table)
62+
destination.write(clean_data, table)
6463
}
6564
}
6665
plan.disconnect!
@@ -72,7 +71,11 @@
7271

7372
end
7473

75-
describe 'mysql => elasticsearch' do
74+
describe 'mysql => elasticsearch' do
75+
before(:each) do
76+
SpecSeeds.setup_mysql
77+
SpecSeeds.setup_elasticsearch
78+
end
7679

7780
after(:each) do
7881
es = SpecClient.elasticsearch('forklift_test')
@@ -94,15 +97,15 @@
9497
count = destination.count({ index: index })["count"]
9598
expect(count).to eql 5
9699
end
97-
100+
98101
it 'can load in only some rows' do
99102
table = 'users'
100103
index = 'users'
101104
plan = SpecPlan.new
102105
plan.do! {
103106
source = plan.connections[:mysql][:forklift_test_source_a]
104107
destination = plan.connections[:elasticsearch][:forklift_test]
105-
source.read("select * from #{table}", source.current_database, false, 3, 0) {|data|
108+
source.read("select * from #{table}", source.current_database, false, 3, 0) {|data|
106109
destination.write(data, index)
107110
}
108111
}
@@ -114,4 +117,13 @@
114117
end
115118
end
116119

117-
end
120+
describe 'postgres => mysql' do
121+
before do
122+
SpecSeeds.setup_mysql
123+
SpecSeeds.setup_postgres
124+
end
125+
126+
it 'can load in a full table'
127+
it 'can load in only some rows'
128+
end
129+
end
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
SET client_min_messages TO WARNING;
2+
DROP TABLE IF EXISTS "products";
3+
4+
CREATE TABLE "products" (
5+
"id" SERIAL NOT NULL PRIMARY KEY,
6+
"name" varchar(255) NOT NULL DEFAULT '',
7+
"description" text NOT NULL,
8+
"inventory" integer DEFAULT NULL,
9+
"created_at" timestamp NOT NULL,
10+
"updated_at" timestamp NOT NULL
11+
);
12+
13+
INSERT INTO "products" ("id", "name", "description", "inventory", "created_at", "updated_at")
14+
VALUES
15+
(1,'car','a car',10,'2014-04-03 11:45:51','2014-04-03 11:45:51'),
16+
(2,'boat','a boat',3,'2014-04-03 11:45:52','2014-04-03 11:45:52'),
17+
(3,'bus','a bus',5,'2014-04-03 11:45:54','2014-04-03 11:45:54'),
18+
(4,'motorcycle','a motorcycle',23,'2014-04-03 11:45:56','2014-04-03 11:45:56'),
19+
(5,'hang_glider','awesome',2,'2014-04-03 11:46:19','2014-04-03 11:46:19');
20+
21+
DROP TABLE IF EXISTS "sales";
22+
23+
CREATE TABLE "sales" (
24+
"id" SERIAL NOT NULL PRIMARY KEY,
25+
"user_id" integer NOT NULL,
26+
"product_id" integer NOT NULL,
27+
"timestamp" timestamp NOT NULL
28+
);
29+
30+
INSERT INTO "sales" ("id", "user_id", "product_id", "timestamp")
31+
VALUES
32+
(1,1,1,'2014-04-03 11:47:11'),
33+
(2,1,2,'2014-04-03 11:47:11'),
34+
(3,4,5,'2014-04-03 11:47:12'),
35+
(4,4,4,'2014-04-03 11:47:25'),
36+
(5,5,5,'2014-04-03 11:47:26');
37+
38+
DROP TABLE IF EXISTS "users";
39+
40+
CREATE TABLE "users" (
41+
"id" SERIAL NOT NULL PRIMARY KEY,
42+
"email" varchar(255) NOT NULL DEFAULT '',
43+
"first_name" varchar(255) NOT NULL DEFAULT '',
44+
"last_name" varchar(255) NOT NULL DEFAULT '',
45+
"created_at" timestamp NOT NULL,
46+
"updated_at" timestamp NOT NULL
47+
);
48+
49+
INSERT INTO "users" ("id", "email", "first_name", "last_name", "created_at", "updated_at")
50+
VALUES
51+
(1,'[email protected]','Evan','T','2014-04-03 11:40:12','2014-04-03 11:39:28'),
52+
(2,'[email protected]','Pablo ','J','2014-04-03 11:41:08','2014-04-03 11:41:08'),
53+
(3,'[email protected]','Kevin','B','2014-04-03 11:41:10','2014-04-03 11:41:10'),
54+
(4,'[email protected]','Brian','L','2014-04-03 11:41:12','2014-04-03 11:41:12'),
55+
(5,'[email protected]','Aaron','B','2014-04-03 11:41:13','2014-04-03 11:41:13');
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
SET client_min_messages TO WARNING;
2+
DROP TABLE IF EXISTS "admin_notes";
3+
4+
CREATE TABLE "admin_notes" (
5+
"id" SERIAL NOT NULL PRIMARY KEY,
6+
"user_id" integer NOT NULL,
7+
"note" text NOT NULL,
8+
"created_at" timestamp NOT NULL,
9+
"updated_at" timestamp NOT NULL
10+
);
11+
12+
INSERT INTO "admin_notes" ("id", "user_id", "note", "created_at", "updated_at")
13+
VALUES
14+
(1,1,'User 1 called customer support\n','2014-04-03 11:50:25','2014-04-03 11:50:25'),
15+
(2,2,'User 2 called customer support','2014-04-03 11:50:26','2014-04-03 11:50:26'),
16+
(3,5,'User 5 returned the purchase','2014-04-03 11:50:28','2014-04-03 11:50:28');

0 commit comments

Comments
 (0)