Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adjust prune compression mode to reduce calls to redis. #2

Open
wants to merge 3 commits into
base: redis-garbage-collector-5-4
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 114 additions & 17 deletions Adapter/RedisTagAwareAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,23 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements PruneableI
* @var string|null detected eviction policy used on Redis server
*/
private $redisEvictionPolicy;
/**
* @var string|null detected redis version of Redis server
*/
private $redisVersion;
/**
* @var bool|null Indicate whether this "namespace" has been pruned and what the result was.
*/
private $pruneResult;
private $namespace;

/**
* @param \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|RedisProxy|RedisClusterProxy $redis The redis client
* @param string $namespace The default namespace
* @param int $defaultLifetime The default lifetime
* @param bool $pruneWithCompression Enable compressed prune. Way more resource intensive.
*/
public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null)
public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null, bool $pruneWithCompression = false)
{
if ($redis instanceof \Predis\ClientInterface && $redis->getConnection() instanceof ClusterInterface && !$redis->getConnection() instanceof PredisCluster) {
throw new InvalidArgumentException(sprintf('Unsupported Predis cluster connection: only "%s" is, "%s" given.', PredisCluster::class, get_debug_type($redis->getConnection())));
Expand All @@ -85,6 +94,7 @@ public function __construct($redis, string $namespace = '', int $defaultLifetime

$this->init($redis, $namespace, $defaultLifetime, new TagAwareMarshaller($marshaller));
$this->namespace = $namespace;
$this->pruneWithCompression = $pruneWithCompression;
}

/**
Expand Down Expand Up @@ -296,6 +306,36 @@ protected function doInvalidate(array $tagIds): bool
return $success;
}

/**
* @TODO Move to RedisTrait? It already has a version check - this would be handy.
*
* @return string
*/
private function getRedisVersion(): string
{
if (null !== $this->redisVersion) {
return $this->redisVersion;
}

$hosts = $this->getHosts();
$host = reset($hosts);
if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
// Predis supports info command only on the master in replication environments
$hosts = [$host->getClientFor('master')];
}

foreach ($hosts as $host) {
$info = $host->info('Server');

if ($info instanceof ErrorInterface) {
continue;
}
return $this->redisVersion = $info['redis_version'];
}
// Fallback to 2.0 like RedisTrait does.
return $this->redisVersion = '2.0';
}

private function getRedisEvictionPolicy(): string
{
if (null !== $this->redisEvictionPolicy) {
Expand Down Expand Up @@ -362,9 +402,15 @@ protected function getAllTagKeys(): array
});

$setKeys = $results->valid() ? iterator_to_array($results) : [];
[$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null];
// merge the fetched ids together
$tagKeys = array_merge($tagKeys, $ids);
// $setKeys[$tagsPrefix] might be an RedisException object -
// check before just using it.
if (is_array($setKeys[$tagsPrefix])) {
[$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null];
// merge the fetched ids together
$tagKeys = array_merge($tagKeys, $ids);
} elseif (isset($setKeys[$tagsPrefix]) && $setKeys[$tagsPrefix] instanceof \Throwable) {
$this->logger->error($setKeys[$tagsPrefix]->getMessage());
}
} while ($cursor = (int) $cursor);

return $tagKeys;
Expand Down Expand Up @@ -425,15 +471,9 @@ private function getOrphanedTagsStats(bool $compressMode = false): array
// referenced and existing cache keys differs collect the
// missing references.
if ($compressMode && \count($referencedCacheKeys) > $existingCacheKeysCount) {
// In order to create the delta each single reference
// has to be checked.
foreach ($referencedCacheKeys as $cacheKey) {
$existingCacheKeyResult = $this->pipeline(function () use ($cacheKey) {
yield 'exists' => [$cacheKey];
});
if ($existingCacheKeyResult->valid() && !$existingCacheKeyResult->current()) {
$orphanedTagReferenceKeys[$tagKey][] = $cacheKey;
}
$orphanedTagReferenceKeysInHash = $this->getOrphanedCacheKeys($referencedCacheKeys);
if (!empty($orphanedTagReferenceKeysInHash)) {
$orphanedTagReferenceKeys[$tagKey] = $orphanedTagReferenceKeysInHash;
}
}
// Stop processing cursors in case compression mode is
Expand All @@ -456,6 +496,57 @@ private function getOrphanedTagsStats(bool $compressMode = false): array
return $stats;
}

/**
* Accepts a list of cache keys and returns a list with orphaned keys.
*
* The method attempts to reduced optimize the testing of the keys by
* batching the key tests and reduce the amount of redis calls.
*
* @param array $cacheKeys
* @param int $chunks Number of chunks to create when processing cacheKeys.
*
* @return array
*/
private function getOrphanedCacheKeys(array $cacheKeys, int $chunks = 2)
{
$orphanedCacheKeys = [];
if (version_compare($this->getRedisVersion(), '2.0.3', '>=')) {
// If we can check multiple keys at once divide and conquer to have
// faster execution.
$cacheKeysChunks = array_chunk($cacheKeys, floor(count($cacheKeys) / $chunks), true);
foreach ($cacheKeysChunks as $cacheKeysChunk) {
$result = $this->pipeline(function () use ($cacheKeysChunk) {
yield 'exists' => [$cacheKeysChunk];
});
if ($result->valid()) {
$existingKeys = $result->current();
if ($existingKeys === 0) {
// None of the chunk exists - register all.
$orphanedCacheKeys = array_merge($orphanedCacheKeys, $cacheKeysChunk);
} elseif ($existingKeys !== count($cacheKeysChunk)) {
// Some exists some don't - trigger another batch of chunks.
// @TODO At what chunk size is a single item comparison more efficient?
// @TODO The call could set an optimized number of chunks. At this point the number of existing keys and the number
// of keys to check is known - this could allow to guesstimate the optimal fragmentation.
$orphanedCacheKeys = array_merge($orphanedCacheKeys, $this->getOrphanedCacheKeys($cacheKeysChunk));
}
}
}
} else {
// Without multi-key support in exists each single reference
// has to be checked individually to create the delta.
foreach ($cacheKeys as $cacheKey) {
$result = $this->pipeline(function () use ($cacheKey) {
yield 'exists' => [$cacheKey];
});
if ($result->valid() && !$result->current()) {
$orphanedCacheKeys[] = $cacheKey;
}
}
}
return $orphanedCacheKeys;
}

/**
* @TODO Verify the LUA scripts are redis-cluster safe.
*/
Expand Down Expand Up @@ -497,11 +588,17 @@ private function pruneOrphanedTags(bool $compressMode = false): bool
return $success;
}

/**
* @TODO Make compression mode flag configurable.
*/
public function prune(): bool
{
return $this->pruneOrphanedTags(true);
// Only prune once per prune run.
if (!isset($this->pruneResult)) {
// First run without compression enabled to reduce data that is
// processed by the compression handling.
$this->pruneResult = $this->pruneOrphanedTags();
if ($this->pruneResult && $this->pruneWithCompression) {
$this->pruneResult = $this->pruneOrphanedTags(true);
}
}
return $this->pruneResult;
}
}