Conversation
Cluster Support
…ForRefreshShardMap Added retry logic for refreshShardMap
MissingNodeException bugfix
throwing TimeoutException for timeouts
…Connection out of the Pool
…lusterConnect function
…ly new HashMap on refresh
src/Database/Redis/Cluster.hs
Outdated
| "TimeoutException" -> throwIO TimeoutException | ||
| _ -> executeRequests (getRandomConnection cc conn) r | ||
| Left (err :: SomeException) -> | ||
| if isPrefixOf "TimeoutException" (displayException err) |
There was a problem hiding this comment.
Do we need to match string in case of exception ? Can't we have a type of exception and pattern match it ?
src/Database/Redis/Connection.hs
Outdated
| let (Cluster.NodeConnection ctx _ _) = nodeConnsList !! selectedIdx | ||
| pipelineConn <- PP.fromCtx ctx | ||
| raceResult <- race (threadDelay (10^(3 :: Int))) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of 1 ms | ||
| envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT" |
There was a problem hiding this comment.
Should we put these timeouts default in a single Config file within Hedis ? Having them distributed now is looking untidy.
| requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply] | ||
| requestNode (NodeConnection ctx lastRecvRef _) requests = do | ||
| requestNode (NodeConnection pool lastRecvRef _) requests = do | ||
| envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT" |
There was a problem hiding this comment.
Lets move these default values and environment variables into a single Config file. @shashitnak
| let (Cluster.NodeConnection pool _ _) = nodeConnsList !! selectedIdx | ||
| withResource pool $ \ctx -> do | ||
| pipelineConn <- PP.fromCtx ctx | ||
| envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT" |
There was a problem hiding this comment.
Same comment, lets combine all such configs in a single place to be more readable / trackable.
src/Database/Redis/Cluster.hs
Outdated
| type IsReadOnly = Bool | ||
|
|
||
| data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly | ||
| data Connection = Connection (MVar NodeConnectionMap) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly TcpInfo |
There was a problem hiding this comment.
@shashitnak @aravindgopall shouldn't we have this as MVar ShardMap NodeConnectionMap ? That would make them coupled which ideally should be the case? Would it be too many changes in case we do so?
| { connectAuth :: Maybe B.ByteString | ||
| , connectTLSParams :: Maybe ClientParams | ||
| , idleTime :: Time.NominalDiffTime | ||
| , maxResources :: Int |
There was a problem hiding this comment.
Should it be renamed as TcpPoolInfo?
src/Database/Redis/Cluster.hs
Outdated
| type IsReadOnly = Bool | ||
|
|
||
| data Connection = Connection (MVar NodeConnectionMap) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly TcpInfo | ||
| data Connection = Connection (TMVar NodeConnectionMap) (MVar Pipeline) (TMVar (ShardMap, Time.UTCTime)) CMD.InfoMap IsReadOnly TcpInfo |
There was a problem hiding this comment.
Lets create a typeAliase here and name it "LastUpdatedAt" and use it here.
src/Database/Redis/Cluster.hs
Outdated
| type IsReadOnly = Bool | ||
|
|
||
| data Connection = Connection (MVar NodeConnectionMap) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly TcpInfo | ||
| data Connection = Connection (TMVar NodeConnectionMap) (MVar Pipeline) (TMVar (ShardMap, Time.UTCTime)) CMD.InfoMap IsReadOnly TcpInfo |
There was a problem hiding this comment.
Lets use a type alias here for Time.UTCTime , use LastUpdatedAt maybe ?
src/Database/Redis/Cluster.hs
Outdated
| connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> TMVar (ShardMap, Time.UTCTime) -> Bool -> ([NodeConnection] -> IO ShardMap) -> TcpInfo -> IO Connection | ||
| connect withAuth commandInfos shardMapVar isReadOnly refreshShardMap (tcpInfo@TcpInfo{ timeoutOpt, maxResources, idleTime }) = do | ||
| shardMap <- readMVar shardMapVar | ||
| shardMap <- atomically $ readTMVar shardMapVar |
There was a problem hiding this comment.
Ideally we should aim for not blocking all threads using Atomically here. We need to do a lock free read on shardMap otherwise this can be a perf bottleneck.
There was a problem hiding this comment.
I am not sure this understanding is right. But @shashitnak you can use IO version of functions like readTMVarIO rather than atomically to convert from STM to IO.
src/Database/Redis/Cluster.hs
Outdated
| allMasterNodes :: Connection -> ShardMap -> IO (Maybe [NodeConnection]) | ||
| allMasterNodes (Connection nodeConnsVar _ _ _ _ _) (ShardMap shardMap) = do | ||
| nodeConns <- readMVar nodeConnsVar | ||
| nodeConns <- atomically $ readTMVar nodeConnsVar |
There was a problem hiding this comment.
Idea should be:
All reads should be lock free. Only during an update, we should acquire a lock (make other threads wait) and update the MVar.
There was a problem hiding this comment.
All reads should be lock free
With(T)MVarthis might never be true.
|
@shashitnak can we also add the time column in the sheet. |
src/Database/Redis/Cluster.hs
Outdated
| type IsReadOnly = Bool | ||
|
|
||
| data Connection = Connection (MVar NodeConnectionMap) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly TcpInfo | ||
| data Connection = Connection (TMVar NodeConnectionMap) (MVar Pipeline) (TMVar (ShardMap, Time.UTCTime)) CMD.InfoMap IsReadOnly TcpInfo |
src/Database/Redis/Cluster.hs
Outdated
| allMasterNodes :: Connection -> ShardMap -> IO (Maybe [NodeConnection]) | ||
| allMasterNodes (Connection nodeConnsVar _ _ _ _ _) (ShardMap shardMap) = do | ||
| nodeConns <- readMVar nodeConnsVar | ||
| nodeConns <- atomically $ readTMVar nodeConnsVar |
There was a problem hiding this comment.
All reads should be lock free
With(T)MVarthis might never be true.
src/Database/Redis/Connection.hs
Outdated
| putStrLn "Refresh called." | ||
| (oldShardMap, lastUpdated) <- atomically $ readTMVar shardMapVar | ||
| now <- Time.getCurrentTime | ||
| if Time.diffUTCTime now lastUpdated <= shardMapRefreshDelta |
There was a problem hiding this comment.
When this will be the case? We can make MVar as empty to implement multiple threads not invoking the refreshShardMap at same time rather than maintain this complex logic of using timestamps.
There was a problem hiding this comment.
@aravindgopall problem here is: We want to make reads as lock free. So basically when a thread reads the MVar and till the time it gets an exception because of topology change and tries to update the MVar, other threads might also come and try to update this.
To avoid multiple threads updating the shardMap at the same time this is being done. There should be a cleaner way of doing this but I felt this is okay to unblock this commit merge ?
There was a problem hiding this comment.
Anyways, the timestamp check should only happen in case of exceptions because of topology change, so not much of a perf difference too.
src/Database/Redis/Cluster.hs
Outdated
| connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> TMVar (ShardMap, Time.UTCTime) -> Bool -> ([NodeConnection] -> IO ShardMap) -> TcpInfo -> IO Connection | ||
| connect withAuth commandInfos shardMapVar isReadOnly refreshShardMap (tcpInfo@TcpInfo{ timeoutOpt, maxResources, idleTime }) = do | ||
| shardMap <- readMVar shardMapVar | ||
| shardMap <- atomically $ readTMVar shardMapVar |
There was a problem hiding this comment.
I am not sure this understanding is right. But @shashitnak you can use IO version of functions like readTMVarIO rather than atomically to convert from STM to IO.
|
@ishan-juspay @aravindgopall sorry for not replying as I didn't visit this PR since a few days. I moved to TMVar because I wasn't completely aware of how MVar's worked so I thought it wouldn't be possible to modify two MVar's without causing race condition or deadlock. But I managed to fix it by using the method that @aravindgopall mentioned. By emptying the MVar as soon as the first thread gets to it and other's will just return readMVar, which will block until the first one will update the shardMap and put it back. I did chaos testing for this change and they are passing |
aravindgopall
left a comment
There was a problem hiding this comment.
I feel changes are good to go.
| refreshShardMap (Cluster.Connection nodeConns _ _ _ _) = | ||
| refreshShardMapWithNodeConn (head $ HM.elems nodeConns) | ||
| refreshShardMap (Cluster.Connection nodeConnsVar _ shardMapVar _ _ Cluster.TcpInfo { idleTime, maxResources, timeoutOpt, connectAuth, connectTLSParams }) = do | ||
| putStrLn "ShardMap Refreshed." |
| requestPipelined :: IO ShardMap -> Connection -> [B.ByteString] -> IO Reply | ||
| requestPipelined refreshAction conn@(Connection _ pipelineVar shardMapVar _ _) nextRequest = modifyMVar pipelineVar $ \(Pipeline stateVar) -> do | ||
| requestPipelined refreshShardmapAction conn@(Connection _ pipelineVar shardMapVar _ _ _) nextRequest = modifyMVar pipelineVar $ \(Pipeline stateVar) -> do | ||
| (newStateVar, repliesIndex) <- hasLocked $ modifyMVar stateVar $ \case |
There was a problem hiding this comment.
@shashitnak This global pipelineVar will cause an issue during multi exec.
To replicate this add threadDelay like 1 min after multi in multiExec function.
- Run a multi exec command.
- Run in parallel some other command that will not fall into same hashslot (if possible different node in cluster) of above multiexec cmds.
cc. @aravindgopall
|
@ishan-juspay @neeraj97 @aravindgopall moved pipelineVar inside the NodeConnection object and that IORef inside the Pool. Since every node has it's own pipeline now, there are some issues in how the replies are being sent to the caller. I think the problem is in cases where some request has to go to all or multiple the nodes. I get a list of reply's inside requestPipelined but I have to return only one. For now I am returning the first one which I think is causing many of the test cases to fail. |
src/Database/Redis/Cluster.hs
Outdated
|
|
||
| -- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection' | ||
| data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID | ||
| data NodeConnection = NodeConnection (Pool SingleNodeConnection) (MVar Pipeline) NodeID |
There was a problem hiding this comment.
Pipeline MVar needs to be part of Pool @shashitnak
73654c1 to
7427b2f
Compare
src/Database/Redis/Cluster.hs
Outdated
| data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID | ||
| data NodeConnection = NodeConnection (Pool NodeResource) NodeID | ||
|
|
||
| data NodeResource = NodeResource CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) |
There was a problem hiding this comment.
@shashitnak lets use a newtype here, data declaration are lesser performant than newtype since newtype is transparent at runtime.
There was a problem hiding this comment.
@ishan-juspay converted NodeResource to newtype. since newtype can only take one parameter so I made the two parameters into a tuple
src/Database/Redis/Connection.hs
Outdated
|
|
||
| shardMapRefreshDelta :: Time.NominalDiffTime | ||
| shardMapRefreshDelta = Time.secondsToNominalDiffTime 60 | ||
|
|
There was a problem hiding this comment.
@neeraj97 this was used for a change that I added briefly for fixing the dead lock which was caused due to refreshShardMap function. Whenever a refresh was called by some thread, no other call to refreshShardMap would do the refresh until shardMapRefreshDelta time has passed. I shifted to using a combination of tryTakeMVar and the blocking nature of readMVar to achive this but then forgot to remove this. Not sure why haskell compiler didn't give unused warning even though this is not being used anywhere
src/Database/Redis/Connection.hs
Outdated
| Nothing -> return Nothing | ||
| ) nodeConnsPair | ||
| let newMap = HM.fromList $ catMaybes workingNodes | ||
| _ <- forkIO $ modifyMVar_ nodeMapVar (\_ -> return newMap) |
There was a problem hiding this comment.
@neeraj97 I don't remember exactly but I think this could be amongst the changes that I was trying for fixing that deadlock in refreshShardMap. I removed this now
src/Database/Redis/Connection.hs
Outdated
| workingNodes <- mapM (\(maybeConn, nodeConn) -> case maybeConn of | ||
| Just conn -> do | ||
| when readOnlyConnection $ do | ||
| PP.beginReceiving conn |
There was a problem hiding this comment.
@shashitnak Is this for readOnly used for reading from slaves?
If so should we remove this logic or build the correct logic for supporting so.
@ishan-juspay
There was a problem hiding this comment.
@neeraj97 @ishan-juspay removed clusterConnect function entirely
src/Database/Redis/Cluster.hs
Outdated
| evaluatePipeline shardMapVar refreshShardmapAction conn requests = do | ||
| shardMap <- hasLocked $ readMVar shardMapVar | ||
| requestsByNode <- getRequestsByNode shardMap | ||
| shardMap <- readMVar shardMapVar |
There was a problem hiding this comment.
why hasLocked has been removed?
It would allow us to throw an exception if things are blocked indefinitely on MVar
This PR contains latest changes that were made to fix the errors that were discovered during Chaos testing following this doc https://docs.google.com/spreadsheets/d/1u02O5RDdAGgQOBDsNjEsFflWNfPKkP8dZvCVXm4jZC4/edit#gid=0