Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog for Hedis

## 0.15.6
* PR#62. Fixed retry handling for TRYAGAIN error. (Added delay before retry and handling of MOVED / ASK error after retry)

## 0.15.5
* PR#49. Changed connectAuth type to sum type to support dynamic auths
* PR#49. Added requestTimeout in connectInfo for timeout of redis command
Expand Down
69 changes: 44 additions & 25 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, fromException)
import Data.Pool(Pool, createPool, withResource, destroyAllResources)
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar)
import Control.Monad(zipWithM, replicateM)
import Control.Monad(zipWithM, replicateM, unless)
import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot)
import qualified Database.Redis.ConnectionContext as CC
import qualified Data.HashMap.Strict as HM
Expand All @@ -48,7 +49,7 @@ import qualified Scanner
import System.Environment (lookupEnv)
import System.IO.Unsafe(unsafeInterleaveIO)
import Text.Read (readMaybe)
import Control.Monad.Extra (loopM, fromMaybeM)
import Control.Monad.Extra (loopM, fromMaybeM, whenJust)
import Database.Redis.Protocol(Reply(Error), renderRequest, reply)
import qualified Database.Redis.Cluster.Command as CMD
import System.Timeout (timeout)
Expand Down Expand Up @@ -124,8 +125,8 @@ type NodeConnectionMap = HM.HashMap NodeID NodeConnection

-- Object for storing connection Info which will be used when cluster is refreshed
data ClusterConfig = ClusterConfig
{
requestTimeout :: Int -- in microseconds
{ requestTimeout :: Int -- in microseconds
, tryAgainDelay :: Maybe Int -- in microseconds
} deriving Show

newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Show, Typeable)
Expand All @@ -143,13 +144,15 @@ instance Exception NoNodeException
data TimeoutException = TimeoutException String deriving (Show, Typeable)
instance Exception TimeoutException

createClusterConnectionPools :: (Host -> CC.PortID -> IO CC.ConnectionContext) -> Int -> Time.NominalDiffTime -> [CMD.CommandInfo] -> ShardMap -> Maybe Double -> IO Connection
createClusterConnectionPools withAuth maxResources idleTime commandInfos shardMap requestTimeoutSeconds = do
createClusterConnectionPools :: (Host -> CC.PortID -> IO CC.ConnectionContext) -> Int -> Time.NominalDiffTime -> [CMD.CommandInfo] -> ShardMap -> Maybe Double -> Maybe Double -> IO Connection
createClusterConnectionPools withAuth maxResources idleTime commandInfos shardMap requestTimeoutSeconds tryAgainDelaySeconds = do
nodeConns <- nodeConnections
shardNodeVar <- newMVar (shardMap, nodeConns)
nodeRequestTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (5 :: Double) . (requestTimeoutSeconds <|> ). (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT"
let tryAgainDelayTime = round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac <$> tryAgainDelaySeconds
let clusterConfig = ClusterConfig {
requestTimeout = nodeRequestTimeout
, tryAgainDelay = tryAgainDelayTime
}
return $ Connection shardNodeVar (CMD.newInfoMap commandInfos) clusterConfig where
nodeConnections :: IO (HM.HashMap NodeID NodeConnection)
Expand Down Expand Up @@ -245,7 +248,7 @@ rawResponse (CompletedRequest _ _ r) = r
-- acceptable in most cases as these errors should only occur in the case of
-- cluster reconfiguration events, which should be rare.
evaluatePipeline :: (Maybe NodeConnection -> IO (ShardMap, NodeConnectionMap)) -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (ClusterConfig reqTimeout)) requests = do
evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (ClusterConfig reqTimeout tryAgainDelayTime)) requests = do
(shardMap, nodesConn) <- hasLocked $ readMVar shardNodeVar
erequestsByNode <- try $ getRequestsByNode shardMap nodesConn
requestsByNode <- case erequestsByNode of
Expand Down Expand Up @@ -273,18 +276,8 @@ evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (Cl
Just (er :: TimeoutException) -> hasLocked $ refreshShardmapAction Nothing >> throwIO er
_ -> getRandomConnection nc conn >>= (`executeRequests` r)
refreshedShardMapAndNodeConnsIORef <- IOR.newIORef Nothing
mapM (\completedRequest@(CompletedRequest index request response) ->
case response of
(Error errString) | (B.isPrefixOf "MOVED" errString || B.isPrefixOf "TRYAGAIN" errString) -> CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef (hasLocked $ refreshShardmapAction (Just nc)) request
(askingRedirection -> Just (host, port)) -> do
refreshedShardMapAndNodeConns <- fromMaybeM (hasLocked $ refreshShardmapAction (Just nc)) $ IOR.readIORef refreshedShardMapAndNodeConnsIORef
maybeAskNode <- nodeConnWithHostAndPort refreshedShardMapAndNodeConns host port
case maybeAskNode of
Just askNode -> CompletedRequest index request <$> (head <$> tail <$> requestNode reqTimeout askNode (["ASKING"] : [request]))
Nothing -> do
CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef (hasLocked $ refreshShardmapAction (Just nc)) request
_ -> return completedRequest
) responses
tryAgainDelayIORef <- IOR.newIORef False
mapM (responseHandler refreshedShardMapAndNodeConnsIORef tryAgainDelayIORef nc 1) responses
) (zip eresps requestsByNode)
return $ map rawResponse $ sortBy (on compare responseIndex) resps
where
Expand All @@ -300,20 +293,46 @@ evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (Cl
executeRequests nodeConn nodeRequests = do
replies <- requestNode reqTimeout nodeConn $ map rawRequest nodeRequests
return $ zipWith (curry (\(PendingRequest i r, rep) -> CompletedRequest i r rep)) nodeRequests replies
refreshShardMapAndRetryRequest :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> IO (ShardMap, NodeConnectionMap) -> [B.ByteString] -> IO Reply
refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef refreshShardmap request = do
(newShardMap, newNodeConn) <- fromMaybeM (hasLocked refreshShardmap >>= (\new -> IOR.writeIORef refreshedShardMapAndNodeConnsIORef (Just new) >> return new )) $
IOR.readIORef refreshedShardMapAndNodeConnsIORef
refreshShardMapIORef :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> NodeConnection -> IO (ShardMap, NodeConnectionMap)
refreshShardMapIORef refreshedShardMapAndNodeConnsIORef nodeConnection =
fromMaybeM
(hasLocked (refreshShardmapAction (Just nodeConnection)) >>= (\new -> IOR.writeIORef refreshedShardMapAndNodeConnsIORef (Just new) >> return new))
(IOR.readIORef refreshedShardMapAndNodeConnsIORef)
refreshShardMapAndRetryRequest :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> NodeConnection -> [B.ByteString] -> IO Reply
refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef nodeConnection request = do
(newShardMap, newNodeConn) <- refreshShardMapIORef refreshedShardMapAndNodeConnsIORef nodeConnection
nodeConns <- nodeConnectionForCommand newShardMap newNodeConn infoMap request
head <$> requestNode reqTimeout (head nodeConns) [request]
responseHandler :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> IOR.IORef Bool -> NodeConnection -> Int -> CompletedRequest -> IO CompletedRequest
responseHandler refreshedShardMapAndNodeConnsIORef tryAgainDelayIORef nc retryCount completedRequest@(CompletedRequest index request response) =
case response of
(Error errString) | (B.isPrefixOf "MOVED" errString) -> CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef nc request
(Error errString) | (B.isPrefixOf "TRYAGAIN" errString) ->
if retryCount > 0
then do
whenJust tryAgainDelayTime $ \_tryAgainDelay -> do
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When tryAgainDelay is not set, should we put a very small delay by default, say of 50ms or something?

tryAgainDelayed <- IOR.readIORef tryAgainDelayIORef
unless tryAgainDelayed $ do
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets discuss this, on what exactly this is solving. @neeraj97 @Candyman770

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to Neeraj, this works.

threadDelay _tryAgainDelay
IOR.writeIORef tryAgainDelayIORef True
newReply <- head <$> requestNode reqTimeout nc [request]
responseHandler refreshedShardMapAndNodeConnsIORef tryAgainDelayIORef nc (retryCount - 1) (CompletedRequest index request newReply)
else return completedRequest
(askingRedirection -> Just (host, port)) -> do
refreshedShardMapAndNodeConns <- refreshShardMapIORef refreshedShardMapAndNodeConnsIORef nc
maybeAskNode <- nodeConnWithHostAndPort refreshedShardMapAndNodeConns host port
case maybeAskNode of
Just askNode -> CompletedRequest index request <$> (head <$> tail <$> requestNode reqTimeout askNode (["ASKING"] : [request]))
Nothing -> CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef nc request
_ -> return completedRequest

--fix multi exec
-- Like `evaluateOnPipeline`, except we expect to be able to run all commands
-- on a single shard. Failing to meet this expectation is an error.
evaluateTransactionPipeline :: (Maybe NodeConnection -> IO (ShardMap, NodeConnectionMap)) -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluateTransactionPipeline refreshShardmapAction conn requests' = do
let requests = reverse requests'
let (Connection shardNodeVar infoMap (ClusterConfig reqTimeout)) = conn
let (Connection shardNodeVar infoMap (ClusterConfig reqTimeout _)) = conn
keys <- mconcat <$> mapM (requestKeys infoMap) requests
-- In cluster mode Redis expects commands in transactions to all work on the
-- same hashslot. We find that hashslot here.
Expand Down Expand Up @@ -508,7 +527,7 @@ hasLocked action =


requestMasterNodes :: Connection -> [B.ByteString] -> IO [Reply]
requestMasterNodes conn@(Connection _ _ (ClusterConfig reqTimeout)) req = do
requestMasterNodes conn@(Connection _ _ (ClusterConfig reqTimeout _)) req = do
masterNodeConns <- masterNodes conn
concat <$> mapM (flip (requestNode reqTimeout) [req]) masterNodeConns

Expand Down
10 changes: 8 additions & 2 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ data ConnectInfo = ConnInfo
-- ^ timeout for a redis command request in seconds example: 0.5 seconds (500 milliseconds)
-- post requestTimeout, TimeoutException will be thrown. This is now only applicable to cluster redis.
-- TODO add for non cluster redis also
, tryAgainDelay :: Maybe Double
-- ^ retry delay for a redis command request when TRYAGAIN error is received during cluster slot migration
-- default value is 100 ms
Copy link
Copy Markdown

@neeraj97 neeraj97 Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove this comment -- default value is 100 ms

--
} deriving Show

data ConnectAuth = Static B.ByteString | Dynamic ShowableIORefByteString
Expand Down Expand Up @@ -137,6 +141,7 @@ defaultConnectInfo = ConnInfo
, connectTimeout = Nothing
, connectTLSParams = Nothing
, requestTimeout = Nothing
, tryAgainDelay = Nothing
}

defaultClusterConnectInfo :: ConnectInfo
Expand All @@ -151,6 +156,7 @@ defaultClusterConnectInfo = ConnInfo
, connectTimeout = Nothing
, connectTLSParams = Nothing
, requestTimeout = Nothing
, tryAgainDelay = Nothing
}

createConnection :: ConnectInfo -> IO PP.Connection
Expand Down Expand Up @@ -236,7 +242,7 @@ instance Exception ClusterConnectError
-- - MOVE, SELECT
-- - PUBLISH, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, RESET
connectCluster :: ConnectInfo -> IO Connection
connectCluster bootstrapConnInfo@ConnInfo{connectMaxConnections,connectMaxIdleTime,requestTimeout} = do
connectCluster bootstrapConnInfo@ConnInfo{connectMaxConnections,connectMaxIdleTime,requestTimeout,tryAgainDelay} = do
conn <- createConnection bootstrapConnInfo
slotsResponse <- runRedisInternal conn clusterSlots
shardMap <- case slotsResponse of
Expand All @@ -247,7 +253,7 @@ connectCluster bootstrapConnInfo@ConnInfo{connectMaxConnections,connectMaxIdleTi
Left e -> throwIO $ ClusterConnectError e
Right infos -> do
let withAuth = connectWithAuth bootstrapConnInfo
clusterConnection <- Cluster.createClusterConnectionPools withAuth connectMaxConnections connectMaxIdleTime infos shardMap requestTimeout
clusterConnection <- Cluster.createClusterConnectionPools withAuth connectMaxConnections connectMaxIdleTime infos shardMap requestTimeout tryAgainDelay
return $ ClusteredConnection bootstrapConnInfo clusterConnection

connectWithAuth :: ConnectInfo -> Cluster.Host -> CC.PortID -> IO CC.ConnectionContext
Expand Down