@@ -65,12 +65,12 @@ def _split_changeset(changeset, min_max, table_pks):
65
65
66
66
67
67
# Custom min/max functions that ignore Nones
68
- def _min (a , b ):
69
- return b if a is None else (a if b is None else min (a , b ))
68
+ def _min (left , right ):
69
+ return right if left is None else (left if right is None else min (left , right ))
70
70
71
71
72
- def _max (a , b ):
73
- return b if a is None else (a if b is None else max (a , b ))
72
+ def _max (left , right ):
73
+ return right if left is None else (left if right is None else max (left , right ))
74
74
75
75
76
76
class Digest :
@@ -99,6 +99,7 @@ def __init__(self, shorts):
99
99
100
100
@classmethod
101
101
def empty (cls ):
102
+ """Return an empty Digest instance such that for any Digest D, D + empty == D - empty == D"""
102
103
return cls ((0 ,) * 16 )
103
104
104
105
@classmethod
@@ -297,20 +298,10 @@ def _store_changesets(self, table, changesets, parents):
297
298
# Store the fragment in a temporary location and then find out its hash and rename to the actual target.
298
299
# Optimisation: in the future, we can hash the upserted rows that we need preemptively and possibly
299
300
# avoid storing the object altogether if it's a duplicate.
300
- tmp_object_id = get_random_object_id ()
301
- upserted = [pk for pk , data in sub_changeset .items () if data [0 ]]
302
- deleted = [pk for pk , data in sub_changeset .items () if not data [0 ]]
303
- self .object_engine .store_fragment (upserted , deleted , SPLITGRAPH_META_SCHEMA , tmp_object_id ,
304
- table .repository .to_schema (),
305
- table .table_name )
306
- # Digest the rows.
307
- deletion_hash = self ._hash_old_changeset_values (sub_changeset , table .table_schema )
308
- insertion_hash = self .calculate_fragment_insertion_hash (SPLITGRAPH_META_SCHEMA , tmp_object_id )
309
-
310
- # We currently don't store the insertion/deletion hashes at all.
311
- content_hash = (insertion_hash - deletion_hash ).hex ()
312
- schema_hash = sha256 (str (table .table_schema ).encode ('ascii' )).hexdigest ()
313
- object_id = "o" + sha256 ((content_hash + schema_hash ).encode ('ascii' )).hexdigest ()[:- 2 ]
301
+ tmp_object_id = self ._store_changeset (sub_changeset , table )
302
+
303
+ deletion_hash , insertion_hash , object_id = self ._get_patch_fragment_hashes (sub_changeset , table ,
304
+ tmp_object_id )
314
305
315
306
object_ids .append (object_id )
316
307
if not self .get_new_objects ([object_id ]):
@@ -326,6 +317,24 @@ def _store_changesets(self, table, changesets, parents):
326
317
insertion_hash = insertion_hash .hex (), deletion_hash = deletion_hash .hex ())
327
318
return object_ids
328
319
320
+ def _get_patch_fragment_hashes (self , sub_changeset , table , tmp_object_id ):
321
+ # Digest the rows.
322
+ deletion_hash = self ._hash_old_changeset_values (sub_changeset , table .table_schema )
323
+ insertion_hash = self .calculate_fragment_insertion_hash (SPLITGRAPH_META_SCHEMA , tmp_object_id )
324
+ content_hash = (insertion_hash - deletion_hash ).hex ()
325
+ schema_hash = sha256 (str (table .table_schema ).encode ('ascii' )).hexdigest ()
326
+ object_id = "o" + sha256 ((content_hash + schema_hash ).encode ('ascii' )).hexdigest ()[:- 2 ]
327
+ return deletion_hash , insertion_hash , object_id
328
+
329
+ def _store_changeset (self , sub_changeset , table ):
330
+ tmp_object_id = get_random_object_id ()
331
+ upserted = [pk for pk , data in sub_changeset .items () if data [0 ]]
332
+ deleted = [pk for pk , data in sub_changeset .items () if not data [0 ]]
333
+ self .object_engine .store_fragment (upserted , deleted , SPLITGRAPH_META_SCHEMA , tmp_object_id ,
334
+ table .repository .to_schema (),
335
+ table .table_name )
336
+ return tmp_object_id
337
+
329
338
def calculate_fragment_insertion_hash (self , schema , table ):
330
339
"""
331
340
Calculate the homomorphic hash of just the rows that a given fragment inserts
@@ -588,9 +597,9 @@ def _conflate_changes(changeset, new_changes):
588
597
589
598
590
599
def get_random_object_id ():
591
- """Assign each table a random ID that it will be stored as. Note that postgres limits table names to 63 characters,
592
- so the IDs shall be 248-bit strings, hex-encoded, + a letter prefix since Postgres doesn't seem to support table
593
- names starting with a digit."""
600
+ """Generate a random ID for temporary/staging objects that haven't had their ID calculated yet.
601
+ Note that Postgres limits table names to 63 characters, so the IDs shall be 248-bit strings, hex-encoded,
602
+ + a letter prefix since Postgres doesn't seem to support table names starting with a digit."""
594
603
# Make sure we're padded to 62 characters (otherwise if the random number generated is less than 2^247 we'll be
595
604
# dropping characters from the hex format)
596
605
return str .format ('o{:062x}' , getrandbits (248 ))
@@ -599,7 +608,7 @@ def get_random_object_id():
599
608
def _qual_to_index_clause (qual , ctype ):
600
609
"""Convert our internal qual format into a WHERE clause that runs against an object's index entry.
601
610
Returns a Postgres clause (as a Composable) and a tuple of arguments to be mogrified into it."""
602
- column_name , operator , value = qual
611
+ column_name , qual_op , value = qual
603
612
604
613
# Our index is essentially a bloom filter: it returns True if an object _might_ have rows
605
614
# that affect the result of a query with a given qual and False if it definitely doesn't.
@@ -611,14 +620,14 @@ def _qual_to_index_clause(qual, ctype):
611
620
612
621
# If the column has to be greater than (or equal to) X, it only might exist in objects
613
622
# whose maximum value is greater than (or equal to) X.
614
- if operator in ('>' , '>=' ):
615
- query += SQL ("(index #>> '{{{},1}}')::" + ctype + " " + operator + " %s" ).format ((Identifier (column_name )))
623
+ if qual_op in ('>' , '>=' ):
624
+ query += SQL ("(index #>> '{{{},1}}')::" + ctype + " " + qual_op + " %s" ).format ((Identifier (column_name )))
616
625
args .append (value )
617
626
# Similar for smaller than, but here we check that the minimum value is smaller than X.
618
- elif operator in ('<' , '<=' ):
619
- query += SQL ("(index #>> '{{{},0}}')::" + ctype + " " + operator + " %s" ).format ((Identifier (column_name )))
627
+ elif qual_op in ('<' , '<=' ):
628
+ query += SQL ("(index #>> '{{{},0}}')::" + ctype + " " + qual_op + " %s" ).format ((Identifier (column_name )))
620
629
args .append (value )
621
- elif operator == '=' :
630
+ elif qual_op == '=' :
622
631
query += SQL ("%s BETWEEN (index #>> '{{{0},0}}')::" + ctype
623
632
+ " AND (index #>> '{{{0},1}}')::" + ctype ).format ((Identifier (column_name )))
624
633
args .append (value )
@@ -636,8 +645,8 @@ def _qual_to_index_clause(qual, ctype):
636
645
637
646
def _qual_to_sql_clause (qual , ctype ):
638
647
"""Convert a qual to a normal SQL clause that can be run against the actual object rather than the index."""
639
- column_name , operator , value = qual
640
- return SQL ("{}::" + ctype + " " + operator + " %s" ).format (Identifier (column_name )), (value ,)
648
+ column_name , qual_op , value = qual
649
+ return SQL ("{}::" + ctype + " " + qual_op + " %s" ).format (Identifier (column_name )), (value ,)
641
650
642
651
643
652
def _quals_to_clause (quals , column_types , qual_to_clause = _qual_to_index_clause ):
0 commit comments