-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_psi.py
1319 lines (1108 loc) · 48.6 KB
/
db_psi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import collections, random, sys, traceback
from dbsim import *
class UncommittedClobber(AbortTransaction):
'''WW conflict with uncommitted write'''
pass
class InvisibleClobber(AbortTransaction):
'''WW conflict with committed-but-invisible write'''
pass
class DangerousStruct2(AbortTransaction):
'''T2 in a Dangerous Structure'''
pass
class DangerousStruct3(AbortTransaction):
'''T3 in a Dangerous Structure'''
pass
class DangerousSkew(AbortTransaction):
'''Dangerous Skew detected'''
pass
class FalseDangerousStruct2(AbortTransaction):
'''T2 in a Dangerous Structure, but no cycle exists'''
pass
class FalseDangerousStruct3(AbortTransaction):
'''T3 in a Dangerous Structure, but no cycle exists'''
pass
class FalseDangerousSkew(AbortTransaction):
'''Dangerous Skew detected, but no cycle exists'''
pass
class WaitDepth(AbortTransaction):
'''Blocking on WW would exceed wait depth limit'''
pass
class DBWedged(Exception):
'''A cycle of D-committed transactions has been detected.
NOT a type of AbortTransaction; none of the victims can abort.
'''
pass
def make_psi_db(tracker, stats, nrec, tid_watch=(), rid_watch=(),
danger_check=2, ww_blocks=False, verbose=False, **extra_kwargs):
'''A database record is comprised of one or more versions. New readers
always use the most recently committed version, even if a write is
in progress, and so RAW and WAR conflicts are nonblocking; writers
do block writers when WAW conflicts arise.
Because we allow WAR hazards, we risk serialization failures:
T1 T2 T3
read A
write A'
read A
write B'
commit
read B'
read B'
We can enforce a serial schedule by forbidding T2 to commit A'
until all readers using A have finished, with A' and B' remaining
invisible during the wait:
T1 T2 T3
read A
write A'
read A
write B'
D-commit
read B
DI-commit
read B
DI-commit
I-commit
Note that T2 is not in doubt during the wait. The transaction has
committed and can release results to the user once its commit
record is durable. We merely prevent the changes from becoming
visible to other transactions until all dependent transactions
have completed. We therefore start to distinguish between D-commit
(writing a commit log that signals the transaction as no longer in
doubt) and I-commit (making changes visible to other transactions).
Unfortunately, the above policy allows readers to block writers
indirectly: a reader who forces a committed transaction's changes
to remain invisible also prevents the next writer from
proceeding. To avoid that bad outcome, we allow T2's results to
become mostly-visible between D-commit and I-commit: everyone can
see them *excepting* those transactions which delay T2's I-commit
(T1 and T3 in this case). Note that this policy is *not* snapshot
isolation, as the following schedule demonstrates:
T1 T2 T3 T4 T5
read A
write A' (T1)
D-commit (T1)
read B
write B' (T3)
write C'
D-commit (T3)
write C'' (T2)
D-commit (T2, T3)
read C'' (T5)
read C (**)
write D'
DI-commit
I-commit
I-commit
write D''
DI-commit
I-commit
Under our scheme, no transaction blocks and all five commit. C'
and C'' are invisible to T3 (in spite of T2 and T5 having
committed), and so T3 reads C instead. The final serial order is
{T3 T2 T5 T1 T4}.
Under SI, T1's read of A would force it to also read B, placing it
before T2 and T3 in the serial order. The attempt to write D''
would then trigger an abort in conflict with T3's write to D',
which would have already become visible by then.
Note that, just as with SI, a transaction's chances of aborting on
write increase with the time elapsed since its first read. We can
always serve reads from an older version to preserve isolation,
but we must not allow mutually invisible writes to coexist. For
example, T3 would abort if it attempted to write C, because its
read of B forces us to isolate it from T2.
Returning to the classic read-only anomaly for SI, we preserve
serial behavior, without blocking, by aborting T2:
T1 T2 T3
read A
read B
write B' (T2)
D-commit (T2)
read A
read B' (T3)
D-commit (T2 T3)
write A' (T1)
!! abort !!
I-commit
I-commit
2PL would preserve serializability without aborting (T2 < T3 <
T1), but would allow T2 to hold up the works. This is highly
unfortunate, because T2 is read-only at the time it prevents T1
(also read-only) from proceeding. That's why 2PL blows up under
contention: it allows readers to block other readers.
Our scheme also catches a similar, more subtle, SI anomaly:
T1 T2 T3
read A
read B
write B' (T2)
D-commit (T2)
read A
read B' (T3)
read C
D-commit (T2 T3)
write C' (T1)
!! abort !!
I-commit
I-commit
Here, we should have T1 < T2 < T3, because T2 does not see B'
(from T3) and T1 does not see C' (from T2). However, T1 cannot
precede T3 because it has seen B' (which T3 wrote).
Two nice properties of this setup:
1. Transactions acquire dependencies on other transactions, not on
other versions. Versions are merely dependency carriers. There are
vastly fewer active transactions than records in the system, which
is helpful if a long-running reader forces us to store
dependencies for a long time.
2. Transaction Ti can only acquire a WAW or RAW dependency on
transactions Tj *after* Tj has committed and finalized its
dependency set. By construction, any transaction that aborts could
only have conferred WAR dependencies, which aborts and commits
resolve equally well. Ti can therefore compute an accurate
dependency set, at commit time, as the union of its own direct
dependencies and their respective dependency sets. Even better,
some or all of those direct dependencies may have already cleared
by the time Ti commits, allowing us to bypass the indirect ones.
Every record maintains the set of active readers; in the absence
of a writer, readers add themselves to the set on arrival and
remove themselves on commit/abort. When a writer arrives, it
unions the active read set with its own WAR set. From that point
on, all future readers update both sets on arrival and
departure. We keep the record's own read set around in case the
writer aborts, to avoid polluting a future writer with unrelated
readers. When the writer makes a new version available, it creates
a new read set to go with it, and the process repeats. As with SI,
we can drop a version once it has a successor *and* all
transactions that co-existed with it have finished.
Any version with an I-committed successor can be removed once its
read set becomes empty. By way of contradiction, suppose that a
new reader TR could arrive after an I-committed successor (due to
TU) appeared and the read set became empty. TR would only have
ignored the newer version if TU carried a WAR dependency on TR. In
that case, however, TU should yet have I-committed because TR has
not finished. We conclude that such a TR cannot exist. Further, TU
is guaranteed to I-commit once all readers in its WAR set have
committed. Therefore, the last reader to leave a given read set
can safely delete it the writer's WAR set was empty afterward
(recall that we keep a pointer to the WAR set in each version). In
case TR read several records written by TU, TU will I-commit after
TR releases the first version in its read set, and TR will find
the TU's WAR set empty when it releases the remaining versions.
Each version contains a value, a read count, a TID stamp, and a
pointer to the next newer verison. When a writer arrives, it
allocates a new version, stamped with a negated TID, with read=0
and next=NULL. It then follows the chain of next pointers until a
successful compare-and-swap appends it to the chain. If it becomes
the first writer in the chain, it sets its TID to positive and can
make modifications. Otherwise, it leaves its TID negative and
blocks. When an active writer commits, it does the following:
1. Set the active version's TID to zero, signalling new readers
that they may need to use the new version instead.
2. Log the commit record.
3. Mark the transaction as committed in the commit table.
4. Use an atomic CAS to clear the active version's TID, signalling
new readers that they need to use the new version instead. If a
reader arrives after marking the transaction as committed and
before clearing the TID, we squint and say it arrived before the
transaction committed; we can get away with it because we're not
using snapshot isolation.
4. Wake the next writer, if any (to avoid races, always send a
wakeup signal if the next pointer is non-NULL and the successor's
TID is negative).
5. If there was no next pointer, use an atomic CAS to install the
new version (may fail if a writer arrives and races). If there was
a next pointer, the next writer will install the new version using
a normal store.
When a reader arrives, it does the following:
1. If the active version's TID is non-zero, use it; otherwise
continue to next step.
2. If the next version's TID is marked as committed, and the TID
is still non-zero after checking, use that version; otherwise
repeat this step until a valid version has been obtained.
The TID and read count are embedded in the same 64-bit word, and
readers use atomic CAS to increment the read count while verifying
that the TID remains non-zero.
checks for non-NULL next pointer; if found, it
sends a wake-up signal to the next version's owner and does not attempt to activate the just-committed version it created (he new owner can do it non-atomically)will update the list head pointer to point at its predecessor
Whenever a new reader arrives, it increments
the read count and accesses the value, decrementing the count
after it completes. When a writer arrives, it uses an atomic CAS
to install a new version containing its TID, locking the
record. Note that readers do not care whether a writer is present,
but continue using the existing version. The writer, meanwhile,
constructs a new versionperforms its update out of place. At some
point before it commits, it must make the new version accessible
in a special shadow table; after commit, readers can access the
shadow entry
in the e and
can then update the record. The update is performed out of place,
with the new value stored in a special shadow table for
unfinalized versions. Readers who arrive after the writer claims
the record but before it commits still see the original version,
effectively placing them before the writer in the global serial
order, and the writer is not allowed to complete until all
existing readers have left. New readers that arrive after the
writer begins the commit process will see the writer's update and
in turn cannot commit until it finishes.
The writer begins the commit process atomically by updating its
entry in the transaction completion table; anyone who sees the TID
will check the completion table, see that it has completed, and
attempt to install the shadow version (the completing transaction
is also trying to install the new version).
Readers who re-read a value always use the version they originally
read, even if later versions became available in the
meantime. Unlike with snapshot isolation, where a query's start
time determines which version it sees, here a reader sees the
latest committed version on its first read, and repeated reads
always come back to the same version. Once the last reader of a
version leaves (decrementing the read count to zero), the version
can be deleted. At the time of this deletion there may very well
exist some older readers who would have seen the version, but did
not actually read it during its lifetime.
'''
'''We track several kinds of dependency sets:
t.deps is the set of all RAW and WAW dependencies t has acquired
directly; t cannot I-commit until t.deps is empty, and t's depstar
will include the union of their depstars in t.deps at D-commit.
t.war holds all WAR dependencies t has picked up; it is installed
as v.war at each version v which t clobbers, ensuring that readers
who arrive later are tracked properly. Unlike RAW and WAW, WAR
does not confer any indirect dependencies on t.
v.r holds the current set of readers for version v. These readers
confer WAR dependencies on any writer that clobbers the version.
t.depstar is the transitive closure of t's dependencies, the set
of transactions that must remain isolated from t even after it
d-commits. Allowing any x in t.ideps to see t's updates would
introduce a serialization anomaly. This set is defined as t.war |
t.deps | {x.depstar : x in t.deps}, and its membership is fixed
once t D-commits.
'''
db = collections.defaultdict(collections.deque)
next_stamp = 1
def get_next_stamp():
nonlocal next_stamp
rval,next_stamp = next_stamp,next_stamp+1
return rval
empty_set = frozenset()
resp_histo = collections.defaultdict(lambda:0)
in_flight = {}
class Transaction(object):
def __init__(self, tid):
self.tid = tid
self.deps, self.war, self.depstar = set(),set(),None
self.footprint, self.clobbers = {}, {}
self.dcommit = self.icommit = self.durable = 0
self.last_write,self.blocked = None,False
class Version(object):
def __init__(self, t, prev):
self.t = t
self.r, self.clobber = set(),None
self.prev = prev
def __iter__(self):
v = self
while v:
yield v
v = v.prev
zerotx = Transaction(0)
zerotx.dcommit = zerotx.icommit = zerotx.dcommit = get_next_stamp()
zerotx.depstar = empty_set
dummy_version = Version(zerotx, None)
db = collections.defaultdict(lambda: Version(zerotx, None))
q,e = not verbose, errlog
def histo_add(h, delay):
h[delay.bit_length()] += 1
def histo_print(h, title, xtime=True, logx=True, logy=True):
b = 1./ONE_TICK if xtime else 1
errlog('\n\tLog-log distribution of %s:', title)
fmt = '\t\t%8.2f: %5d %s' if xtime else '\t\t%8d: %5d %s'
for k in sorted(h.keys()):
n = h[k]
x = b*(2.**k if logx else k) if k else 0
y = n.bit_length() if logy else n
errlog(fmt, x, n, '*'*y)
def tx_read(pid, rid, for_update=False):
Q = q and pid not in tid_watch and rid not in rid_watch
t = in_flight[pid]
Q or e('read: pid=%s rid=%s', pid, rid)
try:
# have I seen this record before?
v = t.footprint[rid]
return v.t.tid
except KeyError:
pass # keep going...
q or e('\tNew record, check versions')
assert not db[rid].clobber
for v in db[rid]:
x = v.t
if x.icommit:
# definitely safe to use
'''NOTE: I-commit occurs the instant t.icommit becomes
True. Afterward, t can set v.t = zerotx for all v it
created, allowing t to be deleted.
'''
Q or e('\tUsing I-committed version %s of rid %s', x.tid, rid)
break
if x.dcommit:
# safe unless we're in the tid's WAR-set
if t not in x.depstar:
Q or e('\tUsing visible D-committed version %s of rid %s', x.tid, rid)
break
q or e('\tSkipping invisible D-committed version: pid=%s', x.tid)
else:
assert not 'reachable'
if v.r is not None:
v.r.add(t)
q or e('\tNew v.r: %s', ' '.join(map(str, (d.tid for d in v.r))))
if v.clobber is not None:
X = Q and v.clobber.tid not in tid_watch
v.clobber.war.add(t)
X or e('\tNew WAR for %s via rid %d: %s', v.clobber.tid, rid,
' '.join(map(str, (d.tid for d in v.clobber.war))))
assert x is v.t
t.footprint[rid] = v
val = tracker.on_access(pid, rid, x.tid, True)
yield from sys_busy(random.randint(ONE_TICK, 2*ONE_TICK),
color='green', title='%s=db[%s]' % (val, rid))
return val
clobbering = {}
no_clobber = (None,None)
def tx_write(pid, rid):
Q = q and pid not in tid_watch and rid not in rid_watch
t = in_flight[pid]
Q or e('write: pid=%s rid=%d', pid, rid)
# have I written this before?
if rid in t.clobbers:
q or e('\talready wrote to this record')
return
if ww_blocks:
c,w = clobbering.get(rid,no_clobber)
if c:
if c.blocked or w:
raise WaitDepth
clobbering[rid] = (c,t)
Q or e('\tpid=%d blocked on WW conflict with pid=%d', pid, c.tid)
t.blocked = True
yield from sys_park()
else:
Q or e('\tpid=%d taking empty clobber slot for rid=%d', pid, rid)
clobbering[rid] = (t,None)
assert clobbering[rid][0] is t
t.last_write = rid
# writes demand latest version or bust (unlike reads)
# TODO: attempt to block on write?
v = db[rid]
assert not v.clobber
x = v.t
assert x is not t # can't clobber a version we created
if x.icommit:
# definitely safe to clobber
n = v.r and len(v.r) or 0
q or e('\tClobbering I-committed version with pid=%s and %d reader(s)', x.tid, n)
pass
elif x.dcommit:
# only clobber versions we can actually see
if t in x.depstar:
q or e('\tAbort: cannot see latest version from pid=%s', x.tid)
raise InvisibleClobber
q or e('\tClobbering I-committed version: pid=%s', x.tid)
# tie in my WAR and replace the current version
assert v.r is not None
v.r.discard(t) # in case we previously read it
v = Version(t, v)
t.last_write = None
t.footprint[rid] = t.clobbers[rid] = v
val = tracker.on_access(pid, rid, x.tid, False)
yield from sys_busy(random.randint(ONE_TICK, 2*ONE_TICK),
color='blue', title='%s=db[%s]' % (val, rid))
def fail_if_wedged():
earliest = min(in_flight)
scc = check_for_cycles(earliest, 'Cycle check')
if scc and not sum(1 for tid in scc if not in_flight[tid].dcommit):
errlog('\n!!! Database wedged !!!')
errlog('Transactions: %s\n!!! !!!\n', ' '.join(map(str, sorted(scc))))
raise DBWedged
tid_watched = set(tid_watch or [0])
def tx_create(pid):
if not (pid % 300):
fail_if_wedged()
if not tid_watched:
yield from sys_exit(0)
in_flight[pid] = t = Transaction(pid)
t.begin = yield from sys_now()
tracker.on_start(pid)
def finish(pid):
then,now = t.begin, (yield from sys_now())
histo_add(resp_histo, then-now)
def i_commit(t):
Q = q and t.tid not in tid_watch
Q or e('\tI-commit %d', t.tid)
assert not t.deps
assert not t.war
t.icommit = get_next_stamp()
# clear dependencies
commits = set()
for rid,v in t.footprint.items():
R = Q and rid not in rid_watch
if v.t is t:
# I created this version; delete its predecessor
R or e('\tFinalize version %d of rid=%d', t.tid, rid)
assert v.prev.clobber is t
if v.prev.r:
Q or e('\t--> unlinking previous version pid=%d with readers: %s',
v.prev.t.tid, ' '.join(map(str, (x.tid for x in v.prev.r))))
#bad assertion: v.prev.clobber can D-commit with readers
#assert not v.prev.r
v.prev = None
v.t = zerotx
x = v.clobber
if x and x.deps:
X = Q and x.tid not in tid_watch
X or e('\tpid %d I-commit no longer blocked on %d', x.tid, t.tid)
x.deps.discard(t)
if not x.deps and not x.war and x.dcommit:
assert not x.icommit
assert x not in commits
commits.add(x)
X or e('\t=> remaining deps={%s} war={%s}',
' '.join(map(str, (d.tid for d in (x.deps or ())))),
' '.join(map(str, (d.tid for d in x.war))))
for x in v.r:
X = Q and x.tid not in tid_watch
# if x accessed a version I created, I ended up in
# x.deps. If x then committed, I also ended up in
# x.war; I may also be in x.war due to it
# clobbering some version I read, but there's no
# harm removing myself now (the read will be
# removed soon enough).
X or e('\tpid %d I-commit no longer blocked on %d', x.tid, t.tid)
if x.deps:
x.deps.discard(t)
if not x.deps and not x.war and x.dcommit:
assert not x.icommit
assert x not in commits
commits.add(x)
X or e('\t=> remaining deps={%s} war={%s}',
' '.join(map(str, (d.tid for d in (x.deps or ())))),
' '.join(map(str, (d.tid for d in x.war))))
else:
# remove myself from the version's read set
Q or e('\tRemove %d from read set of rid=%d', t.tid, rid)
v.r.remove(t)
x = v.clobber
if x and x.war:
X = Q and x.tid not in tid_watch
# my read no longer prevents x from I-committing its clobber of v
x.war.discard(t)
X or e('\tpid %d I-commit no longer blocked on WAR %d', x.tid, t.tid)
X or e('\t=> remaining WAR deps: %s',
' '.join(map(str, (d.tid for d in x.war))))
if not x.war and not x.deps and x.dcommit:
assert not x.icommit
#bad assertion: new versions could arrive after I D-commit
#assert v is db[rid].prev
assert x not in commits
commits.add(x)
X or e('\trid=%s pid=%d dcommit=%s WAR={%s} deps={%s}',
rid, x.tid, x.dcommit,
' '.join(map(str, (d.tid for d in x.war))),
' '.join(map(str, (d.tid for d in x.deps))))
del in_flight[t.tid]
for x in commits:
i_commit(x)
if t.durable:
tracker.on_finish(t.tid)
def check_for_cycles(pid, name, verbose=False):
deps = collections.defaultdict(dict)
for x in in_flight.values():
for rid,v in x.footprint.items():
if v.t is x:
deps[x.tid].setdefault(v.prev.t.tid, ('ww', rid))
for d in v.prev.r:
deps[x.tid].setdefault(d.tid, ('rw', rid))
else:
deps[x.tid].setdefault(v.t.tid, ('wr', rid))
dstring = lambda dcommit: dcommit and ('@%s' % dcommit) or ''
scc = tarjan_incycle(deps, pid)
if scc:
scc = set(scc)
if verbose and scc:
errlog('\t=> %s dependency cycle contains:', name)
edges = ['%s%s %s(%s) %s%s' % (tid,dstring(in_flight[tid].dcommit),
dtype,rid,dep,
dstring(in_flight[dep].dcommit))
for tid in scc
for dep,(dtype,rid) in deps[tid].items()
if dep in scc]
errlog('\t\t%s' % '\n\t\t'.join(sorted(edges)))
return scc
def tx_commit(pid):
Q = q and pid not in tid_watch
t = in_flight[pid]
yield from sys_busy(random.randint(ONE_TICK, 2*ONE_TICK), color='yellow')
Q or e('Commit %s', pid)
tid_watched.discard(pid)
# /// BEGIN CRITICAL SECTION ///
# check for Dangerous Skew
'''Dangerous Skew arises when transaction Ti imposes a WAR dependency
on Tj, and Tj committed before at least one of Ti's
dependencies.
'''
# construct WAR set (as of D-commit); we'll install it at all
# clobbered versions after the commit succeeds.
assert not t.war
for v in t.footprint.values():
if v.t is t:
if v.prev.clobber:
# raced with another writer and lost
raise UncommittedClobber
if not v.prev.t.icommit:
# still not I-committed
t.deps.add(v.prev.t)
t.war |= v.prev.r
else:
assert t in v.r
if not v.t.icommit:
t.deps.add(v.t)
ds = set(t.war)
# check for Dangerous Structures
if danger_check >= 1 and t.war:
q or e('\t%d WAR dependencies --> check for Dangerous Structures', len(t.war))
'''T2 in some Dangerous Structure, with T3 already committed?
T3 could have already polluted T1 by now, so our only
option for breaking the link is to either abort T2 or
shoot down T1. Given that long running queries are
*always* T1 (T2 and T3 have at least one write), we prefer
to abort T2 (plus, shooting down someone else is
hard). Oh, and some of the time T1 has already committed,
but after T3, so we have no choice but to abort T2.
The forced abort is very annoying, because this is the
most commonly-occurring Deadly Structures we test for, and
it has a woefully high false positive rate (~90%).
'''
t1 = next(iter(t.war))
for v in t.footprint.values():
t3 = v.clobber
if t3:
assert t3.dcommit # else clobber wouldn't be visible
if not t1.dcommit or t3.dcommit < t1.dcommit:
q or e('\tAbort: T2 in a Dangerous Structure: %s --RW--> %s --RW--> %s',
t1.tid, pid, t3.tid)
if check_for_cycles(pid, 'Deadly Struct #2'):
raise DangerousStruct2
raise FalseDangerousStruct2
else:
raise AbortTransaction # GROT, just for debugging/stats
'''T3 in some Dangerous Structure, with T2 already committed?
This case is (unfortunately) much less common than the
previous, but the good news is that we still have time to
avoid a bad outcome: we know with certainty that T1 has
not yet seen any of our writes, and there is no dependency
cycle as long as this remains true. If T1 already
committed, we're done. Otherwise, we have to add T1 to our
own depstar so that our committed writes remain invisible
to it, but we are otherwise free to go our merry way.
'''
use_poison = True
for t2 in t.war:
if t2.war:
assert t2.dcommit
t1 = next(iter(t2.war))
if use_poison:
if not t1.dcommit:
q or e('\tDe-fanged a Dangerous Structure: %s --RW--> %s --RW--> %s',
t1.tid, t2.tid, pid)
ds.add(t1)
elif not t1.dcommit:
q or e('\tAbort: T3 in a Dangerous Structure: %s --RW--> %s --RW--> %s',
t1.tid, t2.tid, pid)
if check_for_cycles(pid, 'Deadly Struct #3'):
raise DangerousStruct3
raise FalseDangerousStruct3
else:
raise AbortTransaction # GROT
if danger_check >= 2 and t.deps and len(t.footprint) > len(t.clobbers):
'''Like all Deadly Structures, Deadly Skew is a dependency cycle
containing at least two RW edges; however, in this case
they need not be adjacent. A canonical example is:
T1 T2 T3 T4
read A
write A'
write C'
D-commit
read B
write B'
write D'
D-commit
read D'
D-commit
read C'
D-commit
Very nasty, that, and only possible because we allow T1 to
see writes that committed after its first read (SI forbids
this). A conservative test forbids any reader from
committing if any value it reads was clobbered before a
member of its depstar (leaving the possibility of a
cycle). Unfortunately, the test comes too late to avoid
aborting the reader. Fortunately, Deadly Skew is quite
rare, and our conservative test is responsible for fewer
than 2% of all aborts, even under extreme contention.
'''
q or e('\t%d direct dependencies --> check for Dangerous Skew', len(t.deps))
maxdep = 0
for x in t.deps:
for d in x.depstar:
if not d.icommit:
maxdep = max(maxdep, d.dcommit)
maxdep = max(maxdep, x.dcommit or next_stamp)
minwar = next_stamp
for v in t.footprint.values():
x = v.clobber
if x and x.dcommit:
minwar = min(minwar, x.dcommit)
if minwar < maxdep:
q or e('\tAbort: Dangerous Skew detected')
if check_for_cycles(pid, 'Deadly Skew'):
raise DangerousSkew
raise FalseDangerousSkew
# install clobbered versions
for rid,v in t.clobbers.items():
v.prev.clobber = t
db[rid] = v
for x in t.deps:
X = Q and x.tid not in tid_watch
X or e('\tpid %s has I-commit dep on %s (d-commit:%s i-commit:%s)',
pid, x.tid, x.dcommit, x.icommit)
try:
assert not x.icommit
except AssertionError:
errlog('pid %s has stale I-commit dep on %s', pid, x.tid)
raise
ds.add(x)
ds.update(d for d in x.depstar if not d.icommit)
if ds:
Q or e('\tdepstar for %d at D-commit: %s', pid,
' '.join(map(str, (d.tid for d in ds))))
t.dcommit = get_next_stamp() # writes now visible to non-dependents
t.depstar = ds
# /// END CRITICAL SECTION ///
# save space: clobbered versions no longer need read set
#for rid,v in t.clobbers.items():
# v.prev.r = None
for x in t.war:
assert not x.icommit
assert x.tid in in_flight
for x in t.deps:
assert not x.icommit
assert x.tid in in_flight
if not t.war and not t.deps:
i_commit(t)
tracker.on_commit(pid)
if ww_blocks:
for rid in t.clobbers:
c,w = clobbering.pop(rid)
assert c is t
if w:
X = Q and w.tid not in tid_watch
X or e('\tUnblocking pid=%s from rid=%s', w.tid, rid)
clobbering[rid] = (w,None)
w.blocked = False
yield from sys_unpark(w.tid)
else:
Q or e('\tNobody waiting on rid=%d', rid)
yield from sys_sleep(random.randint(5*ONE_TICK, 10*ONE_TICK))
yield from sys_busy(random.randint(ONE_TICK, 2*ONE_TICK), color='orange')
t.durable = True
if t.icommit:
tracker.on_finish(pid)
def tx_abort(pid):
Q = q and pid not in tid_watch
t = in_flight[pid]
Q or e('Abort %d', pid)
commits = set()
for rid,v in t.footprint.items():
R = Q and rid not in rid_watch
if v.t is t:
# I created this version, delete it
R or e('\tRoll back update of rid=%d', rid)
# nobody else can see this version
assert not v.clobber and not v.r
assert t not in v.r
continue
if v.r:
R or e('\tRemove %d from read set of rid=%d', t.tid, rid)
v.r.remove(t)
else:
q or e('\tUh-oh! rid=%d was neither read nor written by me (v.r=%s)', rid, v.r)
assert not 'reachable'
x = v.clobber
if x and x.war:
X = Q and x.tid not in tid_watch
x.war.discard(t)
X or e('\tpid %d I-commit no longer blocked on WAR %d', x.tid, pid)
X or e('\t=> remaining WAR deps: %s',
' '.join(map(str, (d.tid for d in x.war))))
if not x.war and not x.deps and x.dcommit:
assert not x.icommit
assert x not in commits
commits.add(x)
else:
q or e('\trid=%s still has readers waiting to I-commit: %s',
rid, ' '.join(map(str, (d.tid for d in x.war))))
elif x:
q or e('\tskipping pid=%d with empty WAR', x.tid)
t.dcommit = t.icommit = t.durable = get_next_stamp()
del in_flight[t.tid]
for x in commits:
i_commit(x)
if ww_blocks:
if t.last_write is not None:
t.clobbers[t.last_write] = None
for rid in t.clobbers:
c,w = clobbering.pop(rid, no_clobber)
if c is t:
if w:
X = Q and w.tid not in tid_watch
X or e('\tUnblocking pid=%s', w.tid)
clobbering[rid] = (w,None)
w.blocked = False
yield from sys_unpark(w.tid)
else:
Q or e('\tReleasing clobber slot on rid=%d', rid)
else:
# happens if we aborted due to WaitDepth
Q or e('\tLeaving pid=%s blocked on rid=%s, WW pid=%s',
w and w.tid or None, rid, c.tid)
clobbering[rid] = (c,w)
for rid,(c,w) in clobbering.items():
try:
assert not c or not w or not c.dcommit
except AssertionError:
errlog('pid=%d blocked on D-committed rid=%d pid=%d!', w.tid, rid, c.tid)
raise
yield from sys_busy(random.randint(ONE_TICK, 2*ONE_TICK), color='red')
tracker.on_finish(pid)
def fini():
if in_flight:
fail_if_wedged()
errlog('\nFound %d live transactions at exit (oldest from tick %.2f):',
len(in_flight), min(t.begin for t in in_flight.values())/float(ONE_TICK))
if not q:
for t in in_flight.values():
errlog('\tpid=%s deps={%s}', t.tid, ' '.join(map(str, (x.tid for x in t.war))))
errlog('''
Stats:
Total transactions: %d (%d failures, %d serialization failures)
Total accesses: %d (%d isolation failures)''',
stats.tx_count, stats.tx_aborts, stats.ser_failures,
stats.acc_count, stats.iso_failures)
print_failure_causes(stats)
histo_print(resp_histo, 'transaction response times')
return NamedTuple(nrec=nrec, tx_begin=tx_create,
tx_read=tx_read, tx_write=tx_write,
tx_commit=tx_commit, tx_abort=tx_abort,
fini=fini,
begin_tracking=tracker.begin_tracking,
end_tracking=tracker.end_tracking)
def test_psi_db():
R,U,X = 1,2,3
def test_fini(db):
done = False
def callback():
nonlocal done
done = True
def nop():
pid = yield from sys_getpid()
yield from db.tx_begin(pid)
yield from db.tx_abort(pid)
yield from sys_sleep(1000*ONE_TICK)
db.end_tracking(callback)
yield from sys_spawn(nop())
yield from sys_sleep(1000*ONE_TICK)
yield from sys_spawn(nop())
yield from sys_sleep(10000*ONE_TICK)
assert done
db.fini()
yield from sys_exit()
def access(db, pid, rid, mode, delay):
yield from sys_sleep(delay*ONE_TICK)
yield from db.tx_write(pid, rid) if mode == X else db.tx_read(pid, rid, mode == U)
def commit(db, pid, delay):
if not isinstance(delay, int):
errlog('bad delay: %s', delay)
yield from sys_sleep(delay*ONE_TICK)
yield from db.tx_commit(pid)
def tx_one(db, rid, mode, delay1, delay2):
def thunk():
pid = yield from sys_getpid()
try: