diff --git a/CHANGELOG b/CHANGELOG index 2f5673f..9ee0f21 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,8 @@ # Changelog for Hedis +## 0.15.6 +* PR#62. Fixed retry handling for TRYAGAIN error. (Added delay before retry and handling of MOVED / ASK error after retry) + ## 0.15.5 * PR#49. Changed connectAuth type to sum type to support dynamic auths * PR#49. Added requestTimeout in connectInfo for timeout of redis command diff --git a/src/Database/Redis/Cluster.hs b/src/Database/Redis/Cluster.hs index d213e50..0311da9 100644 --- a/src/Database/Redis/Cluster.hs +++ b/src/Database/Redis/Cluster.hs @@ -36,8 +36,9 @@ import Data.Map(fromListWith, assocs) import Data.Function(on) import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, fromException) import Data.Pool(Pool, createPool, withResource, destroyAllResources) +import Control.Concurrent (threadDelay) import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar) -import Control.Monad(zipWithM, replicateM) +import Control.Monad(zipWithM, replicateM, unless) import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot) import qualified Database.Redis.ConnectionContext as CC import qualified Data.HashMap.Strict as HM @@ -48,7 +49,7 @@ import qualified Scanner import System.Environment (lookupEnv) import System.IO.Unsafe(unsafeInterleaveIO) import Text.Read (readMaybe) -import Control.Monad.Extra (loopM, fromMaybeM) +import Control.Monad.Extra (loopM, fromMaybeM, whenJust) import Database.Redis.Protocol(Reply(Error), renderRequest, reply) import qualified Database.Redis.Cluster.Command as CMD import System.Timeout (timeout) @@ -124,8 +125,8 @@ type NodeConnectionMap = HM.HashMap NodeID NodeConnection -- Object for storing connection Info which will be used when cluster is refreshed data ClusterConfig = ClusterConfig - { - requestTimeout :: Int -- in microseconds + { requestTimeout :: Int -- in microseconds + , tryAgainDelay :: Maybe Int -- in microseconds } deriving Show newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Show, Typeable) @@ -143,13 +144,15 @@ instance Exception NoNodeException data TimeoutException = TimeoutException String deriving (Show, Typeable) instance Exception TimeoutException -createClusterConnectionPools :: (Host -> CC.PortID -> IO CC.ConnectionContext) -> Int -> Time.NominalDiffTime -> [CMD.CommandInfo] -> ShardMap -> Maybe Double -> IO Connection -createClusterConnectionPools withAuth maxResources idleTime commandInfos shardMap requestTimeoutSeconds = do +createClusterConnectionPools :: (Host -> CC.PortID -> IO CC.ConnectionContext) -> Int -> Time.NominalDiffTime -> [CMD.CommandInfo] -> ShardMap -> Maybe Double -> Maybe Double -> IO Connection +createClusterConnectionPools withAuth maxResources idleTime commandInfos shardMap requestTimeoutSeconds tryAgainDelaySeconds = do nodeConns <- nodeConnections shardNodeVar <- newMVar (shardMap, nodeConns) nodeRequestTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (5 :: Double) . (requestTimeoutSeconds <|> ). (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT" + let tryAgainDelayTime = round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac <$> tryAgainDelaySeconds let clusterConfig = ClusterConfig { requestTimeout = nodeRequestTimeout + , tryAgainDelay = tryAgainDelayTime } return $ Connection shardNodeVar (CMD.newInfoMap commandInfos) clusterConfig where nodeConnections :: IO (HM.HashMap NodeID NodeConnection) @@ -245,7 +248,7 @@ rawResponse (CompletedRequest _ _ r) = r -- acceptable in most cases as these errors should only occur in the case of -- cluster reconfiguration events, which should be rare. evaluatePipeline :: (Maybe NodeConnection -> IO (ShardMap, NodeConnectionMap)) -> Connection -> [[B.ByteString]] -> IO [Reply] -evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (ClusterConfig reqTimeout)) requests = do +evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (ClusterConfig reqTimeout tryAgainDelayTime)) requests = do (shardMap, nodesConn) <- hasLocked $ readMVar shardNodeVar erequestsByNode <- try $ getRequestsByNode shardMap nodesConn requestsByNode <- case erequestsByNode of @@ -273,18 +276,8 @@ evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (Cl Just (er :: TimeoutException) -> hasLocked $ refreshShardmapAction Nothing >> throwIO er _ -> getRandomConnection nc conn >>= (`executeRequests` r) refreshedShardMapAndNodeConnsIORef <- IOR.newIORef Nothing - mapM (\completedRequest@(CompletedRequest index request response) -> - case response of - (Error errString) | (B.isPrefixOf "MOVED" errString || B.isPrefixOf "TRYAGAIN" errString) -> CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef (hasLocked $ refreshShardmapAction (Just nc)) request - (askingRedirection -> Just (host, port)) -> do - refreshedShardMapAndNodeConns <- fromMaybeM (hasLocked $ refreshShardmapAction (Just nc)) $ IOR.readIORef refreshedShardMapAndNodeConnsIORef - maybeAskNode <- nodeConnWithHostAndPort refreshedShardMapAndNodeConns host port - case maybeAskNode of - Just askNode -> CompletedRequest index request <$> (head <$> tail <$> requestNode reqTimeout askNode (["ASKING"] : [request])) - Nothing -> do - CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef (hasLocked $ refreshShardmapAction (Just nc)) request - _ -> return completedRequest - ) responses + tryAgainDelayIORef <- IOR.newIORef False + mapM (responseHandler refreshedShardMapAndNodeConnsIORef tryAgainDelayIORef nc 1) responses ) (zip eresps requestsByNode) return $ map rawResponse $ sortBy (on compare responseIndex) resps where @@ -300,12 +293,38 @@ evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (Cl executeRequests nodeConn nodeRequests = do replies <- requestNode reqTimeout nodeConn $ map rawRequest nodeRequests return $ zipWith (curry (\(PendingRequest i r, rep) -> CompletedRequest i r rep)) nodeRequests replies - refreshShardMapAndRetryRequest :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> IO (ShardMap, NodeConnectionMap) -> [B.ByteString] -> IO Reply - refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef refreshShardmap request = do - (newShardMap, newNodeConn) <- fromMaybeM (hasLocked refreshShardmap >>= (\new -> IOR.writeIORef refreshedShardMapAndNodeConnsIORef (Just new) >> return new )) $ - IOR.readIORef refreshedShardMapAndNodeConnsIORef + refreshShardMapIORef :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> NodeConnection -> IO (ShardMap, NodeConnectionMap) + refreshShardMapIORef refreshedShardMapAndNodeConnsIORef nodeConnection = + fromMaybeM + (hasLocked (refreshShardmapAction (Just nodeConnection)) >>= (\new -> IOR.writeIORef refreshedShardMapAndNodeConnsIORef (Just new) >> return new)) + (IOR.readIORef refreshedShardMapAndNodeConnsIORef) + refreshShardMapAndRetryRequest :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> NodeConnection -> [B.ByteString] -> IO Reply + refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef nodeConnection request = do + (newShardMap, newNodeConn) <- refreshShardMapIORef refreshedShardMapAndNodeConnsIORef nodeConnection nodeConns <- nodeConnectionForCommand newShardMap newNodeConn infoMap request head <$> requestNode reqTimeout (head nodeConns) [request] + responseHandler :: IOR.IORef (Maybe (ShardMap, NodeConnectionMap)) -> IOR.IORef Bool -> NodeConnection -> Int -> CompletedRequest -> IO CompletedRequest + responseHandler refreshedShardMapAndNodeConnsIORef tryAgainDelayIORef nc retryCount completedRequest@(CompletedRequest index request response) = + case response of + (Error errString) | (B.isPrefixOf "MOVED" errString) -> CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef nc request + (Error errString) | (B.isPrefixOf "TRYAGAIN" errString) -> + if retryCount > 0 + then do + whenJust tryAgainDelayTime $ \_tryAgainDelay -> do + tryAgainDelayed <- IOR.readIORef tryAgainDelayIORef + unless tryAgainDelayed $ do + threadDelay _tryAgainDelay + IOR.writeIORef tryAgainDelayIORef True + newReply <- head <$> requestNode reqTimeout nc [request] + responseHandler refreshedShardMapAndNodeConnsIORef tryAgainDelayIORef nc (retryCount - 1) (CompletedRequest index request newReply) + else return completedRequest + (askingRedirection -> Just (host, port)) -> do + refreshedShardMapAndNodeConns <- refreshShardMapIORef refreshedShardMapAndNodeConnsIORef nc + maybeAskNode <- nodeConnWithHostAndPort refreshedShardMapAndNodeConns host port + case maybeAskNode of + Just askNode -> CompletedRequest index request <$> (head <$> tail <$> requestNode reqTimeout askNode (["ASKING"] : [request])) + Nothing -> CompletedRequest index request <$> refreshShardMapAndRetryRequest refreshedShardMapAndNodeConnsIORef nc request + _ -> return completedRequest --fix multi exec -- Like `evaluateOnPipeline`, except we expect to be able to run all commands @@ -313,7 +332,7 @@ evaluatePipeline refreshShardmapAction conn@(Connection shardNodeVar infoMap (Cl evaluateTransactionPipeline :: (Maybe NodeConnection -> IO (ShardMap, NodeConnectionMap)) -> Connection -> [[B.ByteString]] -> IO [Reply] evaluateTransactionPipeline refreshShardmapAction conn requests' = do let requests = reverse requests' - let (Connection shardNodeVar infoMap (ClusterConfig reqTimeout)) = conn + let (Connection shardNodeVar infoMap (ClusterConfig reqTimeout _)) = conn keys <- mconcat <$> mapM (requestKeys infoMap) requests -- In cluster mode Redis expects commands in transactions to all work on the -- same hashslot. We find that hashslot here. @@ -508,7 +527,7 @@ hasLocked action = requestMasterNodes :: Connection -> [B.ByteString] -> IO [Reply] -requestMasterNodes conn@(Connection _ _ (ClusterConfig reqTimeout)) req = do +requestMasterNodes conn@(Connection _ _ (ClusterConfig reqTimeout _)) req = do masterNodeConns <- masterNodes conn concat <$> mapM (flip (requestNode reqTimeout) [req]) masterNodeConns diff --git a/src/Database/Redis/Connection.hs b/src/Database/Redis/Connection.hs index 1bd0494..b744102 100644 --- a/src/Database/Redis/Connection.hs +++ b/src/Database/Redis/Connection.hs @@ -96,6 +96,10 @@ data ConnectInfo = ConnInfo -- ^ timeout for a redis command request in seconds example: 0.5 seconds (500 milliseconds) -- post requestTimeout, TimeoutException will be thrown. This is now only applicable to cluster redis. -- TODO add for non cluster redis also + , tryAgainDelay :: Maybe Double + -- ^ retry delay for a redis command request when TRYAGAIN error is received during cluster slot migration + -- default value is 100 ms + -- } deriving Show data ConnectAuth = Static B.ByteString | Dynamic ShowableIORefByteString @@ -137,6 +141,7 @@ defaultConnectInfo = ConnInfo , connectTimeout = Nothing , connectTLSParams = Nothing , requestTimeout = Nothing + , tryAgainDelay = Nothing } defaultClusterConnectInfo :: ConnectInfo @@ -151,6 +156,7 @@ defaultClusterConnectInfo = ConnInfo , connectTimeout = Nothing , connectTLSParams = Nothing , requestTimeout = Nothing + , tryAgainDelay = Nothing } createConnection :: ConnectInfo -> IO PP.Connection @@ -236,7 +242,7 @@ instance Exception ClusterConnectError -- - MOVE, SELECT -- - PUBLISH, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, RESET connectCluster :: ConnectInfo -> IO Connection -connectCluster bootstrapConnInfo@ConnInfo{connectMaxConnections,connectMaxIdleTime,requestTimeout} = do +connectCluster bootstrapConnInfo@ConnInfo{connectMaxConnections,connectMaxIdleTime,requestTimeout,tryAgainDelay} = do conn <- createConnection bootstrapConnInfo slotsResponse <- runRedisInternal conn clusterSlots shardMap <- case slotsResponse of @@ -247,7 +253,7 @@ connectCluster bootstrapConnInfo@ConnInfo{connectMaxConnections,connectMaxIdleTi Left e -> throwIO $ ClusterConnectError e Right infos -> do let withAuth = connectWithAuth bootstrapConnInfo - clusterConnection <- Cluster.createClusterConnectionPools withAuth connectMaxConnections connectMaxIdleTime infos shardMap requestTimeout + clusterConnection <- Cluster.createClusterConnectionPools withAuth connectMaxConnections connectMaxIdleTime infos shardMap requestTimeout tryAgainDelay return $ ClusteredConnection bootstrapConnInfo clusterConnection connectWithAuth :: ConnectInfo -> Cluster.Host -> CC.PortID -> IO CC.ConnectionContext