Skip to content

Commit

Permalink
Apply grow target adjustment to reduce arbitration count (facebookinc…
Browse files Browse the repository at this point in the history
…ubator#10170)

Summary:
Add grow target bytes adjustment so that when growing capacity we grow more than requested if requested target is small. This can help to give allocation headrooms after grow and hence reduce arbitration frequency. Consequently it can help to reduce workload in arbitrator and in turn improve overall performance of the system.

Pull Request resolved: facebookincubator#10170

Reviewed By: xiaoxmeng

Differential Revision: D58653856

Pulled By: tanjialiang

fbshipit-source-id: d15791340ba73c6bdc7fccc02a7530ecad8ee6ff
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Aug 16, 2024
1 parent e7ffae9 commit d805d31
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 45 deletions.
5 changes: 5 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
folly::to<std::string>(options.memoryPoolReservedCapacity) + "B";
extraArbitratorConfigs["memory-pool-transfer-capacity"] =
folly::to<std::string>(options.memoryPoolTransferCapacity) + "B";
extraArbitratorConfigs["fast-exponential-growth-capacity-limit"] =
folly::to<std::string>(options.fastExponentialGrowthCapacityLimit) +
"B";
extraArbitratorConfigs["slow-capacity-grow-pct"] =
folly::to<std::string>(options.slowCapacityGrowPct);
extraArbitratorConfigs["memory-reclaim-max-wait-time"] =
folly::to<std::string>(options.memoryReclaimWaitMs) + "ms";
extraArbitratorConfigs["global-arbitration-enabled"] =
Expand Down
23 changes: 22 additions & 1 deletion velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ struct MemoryManagerOptions {
/// std::malloc.
bool useMmapAllocator{false};

// Number of pages in the largest size class in MmapAllocator.
/// Number of pages in the largest size class in MmapAllocator.
int32_t largestSizeClassPages{256};

/// If true, allocations larger than largest size class size will be delegated
Expand Down Expand Up @@ -175,6 +175,27 @@ struct MemoryManagerOptions {
/// during the memory arbitration.
uint64_t memoryPoolTransferCapacity{128 << 20};

/// When growing capacity, the growth bytes will be adjusted in the
/// following way:
/// - If 2 * current capacity is less than or equal to
/// 'fastExponentialGrowthCapacityLimit', grow through fast path by at
/// least doubling the current capacity, when conditions allow (see below
/// NOTE section).
/// - If 2 * current capacity is greater than
/// 'fastExponentialGrowthCapacityLimit', grow through slow path by growing
/// capacity by at least 'slowCapacityGrowPct' * current capacity if
/// allowed (see below NOTE section).
///
/// NOTE: If original requested growth bytes is larger than the adjusted
/// growth bytes or adjusted growth bytes reaches max capacity limit, the
/// adjusted growth bytes will not be respected.
///
/// NOTE: Capacity growth adjust is only enabled if both
/// 'fastExponentialGrowthCapacityLimit' and 'slowCapacityGrowPct' are set,
/// otherwise it is disabled.
uint64_t fastExponentialGrowthCapacityLimit{512 << 20};
double slowCapacityGrowPct{0.25};

/// Specifies the max time to wait for memory reclaim by arbitration. The
/// memory reclaim might fail if the max wait time has exceeded. If it is
/// zero, then there is no timeout. The default is 5 mins.
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class MemoryArbitrator {
/// manager.
int64_t capacity;

/// callback to check if a memory arbitration request is issued from a
/// Callback to check if a memory arbitration request is issued from a
/// driver thread, then the driver should be put in suspended state to avoid
/// the potential deadlock when reclaim memory from the task of the request
/// memory pool.
Expand Down Expand Up @@ -118,7 +118,7 @@ class MemoryArbitrator {
/// up a number of pools to either shrink its memory capacity without actually
/// freeing memory or reclaim its used memory to free up enough memory for
/// 'requestor' to grow.
virtual bool growCapacity(MemoryPool* pool, uint64_t targetBytes) = 0;
virtual bool growCapacity(MemoryPool* pool, uint64_t requestBytes) = 0;

/// Invoked by the memory manager to shrink up to 'targetBytes' free capacity
/// from a memory 'pool', and returns them back to the arbitrator. If
Expand Down
56 changes: 52 additions & 4 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ bool SharedArbitrator::ExtraConfig::getCheckUsageLeak(
return getConfig<bool>(configs, kCheckUsageLeak, kDefaultCheckUsageLeak);
}

uint64_t
SharedArbitrator::ExtraConfig::getFastExponentialGrowthCapacityLimitBytes(
const std::unordered_map<std::string, std::string>& configs) {
return config::toCapacity(
getConfig<std::string>(
configs,
kFastExponentialGrowthCapacityLimit,
std::string(kDefaultFastExponentialGrowthCapacityLimit)),
config::CapacityUnit::BYTE);
}

double SharedArbitrator::ExtraConfig::getSlowCapacityGrowPct(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<double>(
configs, kSlowCapacityGrowPct, kDefaultSlowCapacityGrowPct);
}

SharedArbitrator::SharedArbitrator(const Config& config)
: MemoryArbitrator(config),
reservedCapacity_(ExtraConfig::getReservedCapacity(config.extraConfigs)),
Expand All @@ -154,10 +171,23 @@ SharedArbitrator::SharedArbitrator(const Config& config)
globalArbitrationEnabled_(
ExtraConfig::getGlobalArbitrationEnabled(config.extraConfigs)),
checkUsageLeak_(ExtraConfig::getCheckUsageLeak(config.extraConfigs)),
fastExponentialGrowthCapacityLimit_(
ExtraConfig::getFastExponentialGrowthCapacityLimitBytes(
config.extraConfigs)),
slowCapacityGrowPct_(
ExtraConfig::getSlowCapacityGrowPct(config.extraConfigs)),
freeReservedCapacity_(reservedCapacity_),
freeNonReservedCapacity_(capacity_ - freeReservedCapacity_) {
VELOX_CHECK_EQ(kind_, config.kind);
VELOX_CHECK_LE(reservedCapacity_, capacity_);
VELOX_CHECK_EQ(
fastExponentialGrowthCapacityLimit_ == 0,
slowCapacityGrowPct_ == 0,
"fastExponentialGrowthCapacityLimit_ {} and slowCapacityGrowPct_ {} "
"both need to be set at the same time to enable growth capacity "
"adjustment.",
fastExponentialGrowthCapacityLimit_,
slowCapacityGrowPct_);
}

std::string SharedArbitrator::Candidate::toString() const {
Expand Down Expand Up @@ -450,8 +480,26 @@ uint64_t SharedArbitrator::testingNumRequests() const {
return numRequests_;
}

uint64_t SharedArbitrator::getCapacityGrowthTarget(
const MemoryPool& pool,
uint64_t requestBytes) const {
if (fastExponentialGrowthCapacityLimit_ == 0 && slowCapacityGrowPct_ == 0) {
return std::max(requestBytes, memoryPoolTransferCapacity_);
}
uint64_t targetBytes{0};
const auto capacity = pool.capacity();
if (capacity * 2 <= fastExponentialGrowthCapacityLimit_) {
targetBytes = capacity;
} else {
targetBytes = capacity * slowCapacityGrowPct_;
}
return std::max(
std::max(requestBytes, targetBytes), memoryPoolTransferCapacity_);
}

bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) {
ArbitrationOperation op(pool, requestBytes);
ArbitrationOperation op(
pool, requestBytes, getCapacityGrowthTarget(*pool, requestBytes));
ScopedArbitration scopedArbitration(this, &op);

bool needGlobalArbitration{false};
Expand Down Expand Up @@ -594,9 +642,9 @@ void SharedArbitrator::getGrowTargets(
ArbitrationOperation* op,
uint64_t& maxGrowTarget,
uint64_t& minGrowTarget) {
maxGrowTarget = std::min(
maxGrowCapacity(*op->requestPool),
std::max(memoryPoolTransferCapacity_, op->requestBytes));
VELOX_CHECK(op->targetBytes.has_value());
maxGrowTarget =
std::min(maxGrowCapacity(*op->requestPool), op->targetBytes.value());
minGrowTarget = minGrowCapacity(*op->requestPool);
}

Expand Down
72 changes: 65 additions & 7 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,37 @@ class SharedArbitrator : public memory::MemoryArbitrator {
static bool getGlobalArbitrationEnabled(
const std::unordered_map<std::string, std::string>& configs);

/// When growing capacity, the growth bytes will be adjusted in the
/// following way:
/// - If 2 * current capacity is less than or equal to
/// 'fastExponentialGrowthCapacityLimit', grow through fast path by at
/// least doubling the current capacity, when conditions allow (see below
/// NOTE section).
/// - If 2 * current capacity is greater than
/// 'fastExponentialGrowthCapacityLimit', grow through slow path by
/// growing capacity by at least 'slowCapacityGrowPct' * current capacity
/// if allowed (see below NOTE section).
///
/// NOTE: If original requested growth bytes is larger than the adjusted
/// growth bytes or adjusted growth bytes reaches max capacity limit, the
/// adjusted growth bytes will not be respected.
///
/// NOTE: Capacity growth adjust is only enabled if both
/// 'fastExponentialGrowthCapacityLimit' and 'slowCapacityGrowPct' are set,
/// otherwise it is disabled.
static constexpr std::string_view kFastExponentialGrowthCapacityLimit{
"fast-exponential-growth-capacity-limit"};
static constexpr std::string_view
kDefaultFastExponentialGrowthCapacityLimit{"512MB"};
static uint64_t getFastExponentialGrowthCapacityLimitBytes(
const std::unordered_map<std::string, std::string>& configs);

static constexpr std::string_view kSlowCapacityGrowPct{
"slow-capacity-grow-pct"};
static constexpr double kDefaultSlowCapacityGrowPct{0.25};
static double getSlowCapacityGrowPct(
const std::unordered_map<std::string, std::string>& configs);

/// If true, do sanity check on the arbitrator state on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
Expand Down Expand Up @@ -168,6 +199,17 @@ class SharedArbitrator : public memory::MemoryArbitrator {
struct ArbitrationOperation {
MemoryPool* const requestPool;
const uint64_t requestBytes;

// The adjusted grow bytes based on 'requestBytes'. This 'targetBytes' is a
// best effort target, and hence will not be guaranteed. The adjustment is
// based on 'SharedArbitrator::fastExponentialGrowthCapacityLimit_'
// 'SharedArbitrator::slowCapacityGrowPct_' and
// 'MemoryArbitrator::memoryPoolTransferCapacity_'.
//
// TODO: deprecate 'MemoryArbitrator::memoryPoolTransferCapacity_' once
// exponential growth works well in production.
const std::optional<uint64_t> targetBytes;

// The start time of this arbitration operation.
const std::chrono::steady_clock::time_point startTime;

Expand All @@ -184,11 +226,15 @@ class SharedArbitrator : public memory::MemoryArbitrator {
uint64_t globalArbitrationLockWaitTimeUs{0};

explicit ArbitrationOperation(uint64_t requestBytes)
: ArbitrationOperation(nullptr, requestBytes) {}
: ArbitrationOperation(nullptr, requestBytes, std::nullopt) {}

ArbitrationOperation(MemoryPool* _requestor, uint64_t _requestBytes)
ArbitrationOperation(
MemoryPool* _requestor,
uint64_t _requestBytes,
std::optional<uint64_t> _targetBytes)
: requestPool(_requestor),
requestBytes(_requestBytes),
targetBytes(_targetBytes),
startTime(std::chrono::steady_clock::now()) {
VELOX_CHECK(requestPool == nullptr || requestPool->isRoot());
}
Expand Down Expand Up @@ -256,10 +302,10 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// Invoked to run local arbitration on the request memory pool. It first
// ensures the memory growth is within both memory pool and arbitrator
// capacity limits. This step might reclaim the used memory from the request
// memory pool itself. Then it tries to allocate free capacity from the
// arbitrator. At last, it tries to reclaim free memory from the other queries
// before it falls back to the global arbitration. The local arbitration run
// is protected by shared lock of 'arbitrationLock_' which can run in parallel
// memory pool itself. Then it tries to obtain free capacity from the
// arbitrator. At last, it tries to reclaim free memory from itself before it
// falls back to the global arbitration. The local arbitration run is
// protected by shared lock of 'arbitrationLock_' which can run in parallel
// for different query pools. The free memory reclamation is protected by
// arbitrator 'mutex_' which is an in-memory fast operation. The function
// returns false on failure. Otherwise, it needs to further check if
Expand All @@ -275,7 +321,9 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// on success, false on failure.
bool runGlobalArbitration(ArbitrationOperation* op);

// Gets the mim/max memory capacity growth targets for 'op'.
// Gets the mim/max memory capacity growth targets for 'op'. The min and max
// targets are calculated based on memoryPoolReservedCapacity_ requirements
// and the pool's max capacity.
void getGrowTargets(
ArbitrationOperation* op,
uint64_t& maxGrowTarget,
Expand Down Expand Up @@ -415,6 +463,13 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// the reserved capacity as specified by 'memoryPoolReservedCapacity_'.
int64_t minGrowCapacity(const MemoryPool& pool) const;

// The capacity growth target is set to have a coarser granularity. It can
// help to reduce the number of future grow calls, and hence reducing the
// number of unnecessary memory arbitration requests.
uint64_t getCapacityGrowthTarget(
const MemoryPool& pool,
uint64_t requestBytes) const;

// Returns true if 'pool' is under memory arbitration.
bool isUnderArbitrationLocked(MemoryPool* pool) const;

Expand All @@ -430,6 +485,9 @@ class SharedArbitrator : public memory::MemoryArbitrator {
const bool globalArbitrationEnabled_;
const bool checkUsageLeak_;

const uint64_t fastExponentialGrowthCapacityLimit_;
const double slowCapacityGrowPct_;

mutable folly::SharedMutex poolLock_;
std::unordered_map<MemoryPool*, std::weak_ptr<MemoryPool>> candidates_;

Expand Down
Loading

0 comments on commit d805d31

Please sign in to comment.