Skip to content

Commit 9a3843e

Browse files
committed
use partition
1 parent fefcdd0 commit 9a3843e

File tree

1 file changed

+40
-25
lines changed

1 file changed

+40
-25
lines changed

src/Console/ProcessCacheInvalidationEventsCommand.php

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,28 @@ private function getStoreFromConnectionName(string $connection_name): ?string
5959
return null;
6060
}
6161

62-
protected function getCache_invalidation_eventsPartitionName(int $shardId, int $priority, int $processed, Carbon $processingStartTime): string
62+
protected function getCache_invalidation_eventsPartitionName(int $shardId, int $priorityId): string
6363
{
64-
if ($processed === 0) {
65-
return "p_unprocessed_s{$shardId}_p{$priority}";
64+
// Calcola il valore della partizione
65+
$shards = config('super_cache_invalidate.total_shards', 10);
66+
$priorities = [0, 1];
67+
68+
$partitionStatements = [];
69+
70+
$partitionValueId = ($priorityId * $shards) + $shardId + 1;
71+
72+
// Partitions for unprocessed events
73+
foreach ($priorities as $priority) {
74+
for ($shard = 0; $shard < $shards; $shard++) {
75+
$partitionName = "p_unprocessed_s{$shard}_p{$priority}";
76+
$partitionValue = ($priority * $shards) + $shard + 1;
77+
if ($partitionValueId < $partitionValue) {
78+
return $partitionName;
79+
}
80+
}
6681
}
67-
$year = $processingStartTime->year;
68-
$week = $processingStartTime->weekOfYear;
69-
return "p_s{$shardId}_p{$priority}_{$year}w{$week}";
82+
83+
return '';
7084
}
7185

7286
/**
@@ -86,19 +100,21 @@ protected function processEvents(int $shardId, int $priority, int $limit, int $t
86100

87101
// Fetch a batch of unprocessed events
88102
$partitionCache_invalidation_events = $this->getCache_invalidation_eventsPartitionName($shardId, $priority, 0, $processingStartTime);
103+
89104
$events = DB::table(DB::raw("`cache_invalidation_events` PARTITION ({$partitionCache_invalidation_events})"))
90105
//->from(DB::raw("`{$this->from}` PARTITION ({$partitionsString})"))
91-
->where('processed', '=', 0)
92-
->where('shard', '=', $shardId)
93-
->where('priority', '=', $priority)
94-
->where('event_time', '<', $processingStartTime)
106+
->where('processed', '=', 0)
107+
->where('shard', '=', $shardId)
108+
->where('priority', '=', $priority)
109+
->where('event_time', '<', $processingStartTime)
95110
// Cerco tutte le chiavi/tag da invalidare per questo database redis
96-
->where('connection_name', '=', $connection_name)
97-
->orderBy('event_time')
98-
->limit($limit)
99-
->get()
111+
->where('connection_name', '=', $connection_name)
112+
->orderBy('event_time')
113+
->limit($limit)
114+
->get()
100115
;
101116

117+
//ds($partitionCache_invalidation_events . ' -> Shard (' . $shardId . ') Priority (' . $priority . ') Record = ' . $events->count());
102118
if ($events->isEmpty()) {
103119
// No more events to process
104120
return;
@@ -118,11 +134,10 @@ protected function processEvents(int $shardId, int $priority, int $limit, int $t
118134

119135
//retrive associated identifiers related to fetched event id
120136
// Per le chiavi/tag associati non filtro per connection_name, potrebbero esserci associazioni anche in altri database
121-
$partitionCache_invalidation_event_associations = "p_{$processingStartTime->year}w{$processingStartTime->weekOfYear}";
122-
$associations = DB::table(DB::raw("`cache_invalidation_event_associations` PARTITION ({$partitionCache_invalidation_event_associations})"))
123-
->whereIn('event_id', $eventIds)
124-
->get()
125-
->groupBy('event_id')
137+
$associations = DB::table('cache_invalidation_event_associations')
138+
->whereIn('event_id', $eventIds)
139+
->get()
140+
->groupBy('event_id')
126141
;
127142

128143
// Prepare list of all identifiers to fetch last invalidation times
@@ -306,10 +321,10 @@ protected function updateLastInvalidationTimes(array $identifiers): void
306321
foreach ($identifiers as $key) {
307322
[$type, $identifier] = explode(':', $key, 2);
308323
DB::table('cache_invalidation_timestamps')
309-
->updateOrInsert(
310-
['identifier_type' => $type, 'identifier' => $identifier],
311-
['last_invalidated' => $now]
312-
)
324+
->updateOrInsert(
325+
['identifier_type' => $type, 'identifier' => $identifier],
326+
['last_invalidated' => $now]
327+
)
313328
;
314329
}
315330
}
@@ -376,8 +391,8 @@ protected function processBatch(array $batchIdentifiers, array $eventsToUpdate):
376391

377392
// Mark events as processed
378393
DB::table('cache_invalidation_events')
379-
->whereIn('id', $eventsToUpdate)
380-
->update(['processed' => 1])
394+
->whereIn('id', $eventsToUpdate)
395+
->update(['processed' => 1])
381396
;
382397

383398
// Commit transaction

0 commit comments

Comments
 (0)