7
7
8
8
import codecs
9
9
import copy
10
- import json
11
10
import os
11
+ import tempfile
12
+ import uuid
12
13
from collections import OrderedDict
13
14
from decimal import Decimal
14
15
from warnings import warn
15
16
17
+ import BTrees .OOBTree
18
+ import ijson
19
+ import transaction
16
20
import xmltodict
21
+ import zc .zlibstorage
22
+ import ZODB .FileStorage
17
23
18
24
from flattentool .i18n import _
19
25
from flattentool .input import path_search
20
26
from flattentool .schema import make_sub_sheet_name
21
- from flattentool .sheet import Sheet
27
+ from flattentool .sheet import PersistentSheet
22
28
23
29
BASIC_TYPES = [str , bool , int , Decimal , type (None )]
24
30
@@ -112,9 +118,26 @@ def __init__(
112
118
remove_empty_schema_columns = False ,
113
119
rollup = False ,
114
120
truncation_length = 3 ,
121
+ persist = False ,
115
122
):
123
+ if persist :
124
+ self .zodb_db_location = (
125
+ tempfile .gettempdir () + "/flattentool-" + str (uuid .uuid4 ())
126
+ )
127
+ zodb_storage = zc .zlibstorage .ZlibStorage (
128
+ ZODB .FileStorage .FileStorage (self .zodb_db_location )
129
+ )
130
+ self .db = ZODB .DB (zodb_storage )
131
+ else :
132
+ # If None, in memory storage is used.
133
+ self .db = ZODB .DB (None )
134
+
135
+ self .connection = self .db .open ()
136
+ root = self .connection .root
137
+ root .sheet_store = BTrees .OOBTree .BTree ()
138
+
116
139
self .sub_sheets = {}
117
- self .main_sheet = Sheet ( )
140
+ self .main_sheet = PersistentSheet ( connection = self . connection , name = "" )
118
141
self .root_list_path = root_list_path
119
142
self .root_id = root_id
120
143
self .use_titles = use_titles
@@ -125,9 +148,17 @@ def __init__(
125
148
self .filter_value = filter_value
126
149
self .remove_empty_schema_columns = remove_empty_schema_columns
127
150
self .seen_paths = set ()
151
+ self .persist = persist
128
152
129
153
if schema_parser :
130
- self .main_sheet = copy .deepcopy (schema_parser .main_sheet )
154
+ self .main_sheet = PersistentSheet .from_sheet (
155
+ schema_parser .main_sheet , self .connection
156
+ )
157
+ for sheet_name , sheet in list (self .sub_sheets .items ()):
158
+ self .sub_sheets [sheet_name ] = PersistentSheet .from_sheet (
159
+ sheet , self .connection
160
+ )
161
+
131
162
self .sub_sheets = copy .deepcopy (schema_parser .sub_sheets )
132
163
if remove_empty_schema_columns :
133
164
# Don't use columns from the schema parser
@@ -194,18 +225,13 @@ def __init__(
194
225
_ ("Only one of json_file or root_json_dict should be supplied" )
195
226
)
196
227
197
- if json_filename :
198
- with codecs .open (json_filename , encoding = "utf-8" ) as json_file :
199
- try :
200
- self .root_json_dict = json .load (
201
- json_file , object_pairs_hook = OrderedDict , parse_float = Decimal
202
- )
203
- except UnicodeError as err :
204
- raise BadlyFormedJSONErrorUTF8 (* err .args )
205
- except ValueError as err :
206
- raise BadlyFormedJSONError (* err .args )
207
- else :
208
- self .root_json_dict = root_json_dict
228
+ if not json_filename :
229
+ if self .root_list_path is None :
230
+ self .root_json_list = root_json_dict
231
+ else :
232
+ self .root_json_list = path_search (
233
+ root_json_dict , self .root_list_path .split ("/" )
234
+ )
209
235
210
236
if preserve_fields :
211
237
# Extract fields to be preserved from input file (one path per line)
@@ -240,19 +266,37 @@ def __init__(
240
266
self .preserve_fields = None
241
267
self .preserve_fields_input = None
242
268
269
+ if json_filename :
270
+ if self .root_list_path is None :
271
+ path = "item"
272
+ else :
273
+ path = root_list_path .replace ("/" , "." ) + ".item"
274
+
275
+ json_file = codecs .open (json_filename , encoding = "utf-8" )
276
+
277
+ self .root_json_list = ijson .items (json_file , path , map_type = OrderedDict )
278
+
279
+ try :
280
+ self .parse ()
281
+ except ijson .common .IncompleteJSONError as err :
282
+ raise BadlyFormedJSONError (* err .args )
283
+ except UnicodeDecodeError as err :
284
+ raise BadlyFormedJSONErrorUTF8 (* err .args )
285
+ finally :
286
+ if json_filename :
287
+ json_file .close ()
288
+
243
289
def parse (self ):
244
- if self .root_list_path is None :
245
- root_json_list = self .root_json_dict
246
- else :
247
- root_json_list = path_search (
248
- self .root_json_dict , self .root_list_path .split ("/" )
249
- )
250
- for json_dict in root_json_list :
290
+ for num , json_dict in enumerate (self .root_json_list ):
251
291
if json_dict is None :
252
292
# This is particularly useful for IATI XML, in order to not
253
293
# fall over on empty activity, e.g. <iati-activity/>
254
294
continue
255
295
self .parse_json_dict (json_dict , sheet = self .main_sheet )
296
+ if num % 2000 == 0 and num != 0 :
297
+ transaction .commit ()
298
+
299
+ transaction .commit ()
256
300
257
301
if self .remove_empty_schema_columns :
258
302
# Remove sheets with no lines of data
@@ -501,7 +545,9 @@ def parse_json_dict(
501
545
parent_name , key , truncation_length = self .truncation_length
502
546
)
503
547
if sub_sheet_name not in self .sub_sheets :
504
- self .sub_sheets [sub_sheet_name ] = Sheet (name = sub_sheet_name )
548
+ self .sub_sheets [sub_sheet_name ] = PersistentSheet (
549
+ name = sub_sheet_name , connection = self .connection
550
+ )
505
551
506
552
for json_dict in value :
507
553
if json_dict is None :
@@ -518,4 +564,16 @@ def parse_json_dict(
518
564
raise ValueError (_ ("Unsupported type {}" ).format (type (value )))
519
565
520
566
if top :
521
- sheet .lines .append (flattened_dict )
567
+ sheet .append_line (flattened_dict )
568
+
569
+ def __enter__ (self ):
570
+ return self
571
+
572
+ def __exit__ (self , type , value , traceback ):
573
+ if self .persist :
574
+ self .connection .close ()
575
+ self .db .close ()
576
+ os .remove (self .zodb_db_location )
577
+ os .remove (self .zodb_db_location + ".lock" )
578
+ os .remove (self .zodb_db_location + ".index" )
579
+ os .remove (self .zodb_db_location + ".tmp" )
0 commit comments