Raft.Shutdown() with InmemTransport hangs after leader changed
Closed this issue · 1 comments
babbageclunk commented
We sometimes see the Juju raft tests timeout because Raft.Shutdown has hung.
I've got a standalone reproduction (not that small though) that hangs reliably. Digging into it the InmemTransport only applies its timeout on receiving responses back from the peer, but not sending them.
package main
import (
"fmt"
"io"
"log"
"os"
"sync"
"time"
"net/http"
_ "net/http/pprof"
"github.com/hashicorp/raft"
)
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
syncStderr := &syncWriter{target: os.Stderr}
r0, t0, err := newRaft("node-0", syncStderr)
if err != nil {
panic(err)
}
f := r0.BootstrapCluster(raft.Configuration{
Servers: []raft.Server{{
ID: "node-0",
Address: t0.LocalAddr(),
}},
})
if err := f.Error(); err != nil {
panic(err)
}
// Wait for r0 to be leader.
select {
case isLeader := <-r0.LeaderCh():
if !isLeader {
panic("r0 wasn't leader")
}
case <-time.After(time.Second):
panic("timed out waiting for r0 to be leader")
}
// Make two more and add them to the cluster.
r1, t1, err := newRaft("node-1", syncStderr)
if err != nil {
panic(err)
}
r2, t2, err := newRaft("node-2", syncStderr)
if err != nil {
panic(err)
}
connectTransports(t0, t1, t2)
f1 := r0.AddVoter("node-1", t1.LocalAddr(), 0, 0)
f2 := r0.AddVoter("node-2", t2.LocalAddr(), 0, 0)
if err := f1.Error(); err != nil {
panic(err)
}
if err := f2.Error(); err != nil {
panic(err)
}
// Shut the leader down.
if err := r0.Shutdown().Error(); err != nil {
panic(err)
}
if err := t0.Close(); err != nil {
panic(err)
}
// Wait until one of the other nodes is leader.
var newLeader *raft.Raft
var name string
loop:
for {
select {
case value := <-r1.LeaderCh():
if value {
newLeader = r1
name = "node-1"
break loop
}
case value := <-r2.LeaderCh():
if value {
newLeader = r2
name = "node-2"
break loop
}
case <-time.After(2 * time.Second):
panic("timed out waiting for new leader")
}
}
// If this sleep is omitted the hang doesn't happen.
time.Sleep(2 * time.Second)
// Try to shut the new leader down.
fmt.Printf("**** shutting down new leader %s\n", name)
disconnectTransports(t0, t1, t2)
if err := newLeader.Shutdown().Error(); err != nil {
panic(err)
}
fmt.Printf("**** new leader %s shut down successfully\n", name)
}
func newRaft(id raft.ServerID, output io.Writer) (*raft.Raft, *raft.InmemTransport, error) {
_, transport := raft.NewInmemTransport("")
store := raft.NewInmemStore()
snapshotStore := raft.NewInmemSnapshotStore()
config := raft.DefaultConfig()
config.ShutdownOnRemove = false
config.LocalID = id
config.HeartbeatTimeout = 100 * time.Millisecond
config.ElectionTimeout = config.HeartbeatTimeout
config.LeaderLeaseTimeout = config.HeartbeatTimeout
config.Logger = log.New(output, fmt.Sprintf("%s ", id), log.LstdFlags)
if err := raft.ValidateConfig(config); err != nil {
return nil, nil, err
}
r, err := raft.NewRaft(config, &nullFSM{}, store, store, snapshotStore, transport)
if err != nil {
transport.Close()
return nil, nil, err
}
return r, transport, nil
}
// Connect the provided transport bidirectionally.
func connectTransports(transports ...raft.LoopbackTransport) {
for _, t1 := range transports {
for _, t2 := range transports {
if t1 == t2 {
continue
}
t1.Connect(t2.LocalAddr(), t2)
}
}
}
func disconnectTransports(transports ...raft.LoopbackTransport) {
for _, t1 := range transports {
for _, t2 := range transports {
if t1 == t2 {
continue
}
t1.Disconnect(t2.LocalAddr())
}
}
}
type nullFSM struct{}
func (m *nullFSM) Apply(log *raft.Log) interface{} {
return nil
}
func (m *nullFSM) Snapshot() (raft.FSMSnapshot, error) {
return &nullSnapshot{}, nil
}
func (m *nullFSM) Restore(rc io.ReadCloser) error {
rc.Close()
return nil
}
type nullSnapshot struct{}
func (s *nullSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Close()
return nil
}
func (s *nullSnapshot) Release() {}
type syncWriter struct {
mu sync.Mutex
target io.Writer
}
func (w *syncWriter) Write(data []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.target.Write(data)
}
schristoff commented
Thank you so much for bringing this to our attention. I'm going to close this issue since the changes have been merged! :)