hashicorp/raft

Raft.Shutdown() with InmemTransport hangs after leader changed

Closed this issue · 1 comments

We sometimes see the Juju raft tests timeout because Raft.Shutdown has hung.

I've got a standalone reproduction (not that small though) that hangs reliably. Digging into it the InmemTransport only applies its timeout on receiving responses back from the peer, but not sending them.

package main

import (
	"fmt"
	"io"
	"log"
	"os"
	"sync"
	"time"

	"net/http"
	_ "net/http/pprof"

	"github.com/hashicorp/raft"
)

func main() {
	go func() {
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	syncStderr := &syncWriter{target: os.Stderr}
	r0, t0, err := newRaft("node-0", syncStderr)
	if err != nil {
		panic(err)
	}

	f := r0.BootstrapCluster(raft.Configuration{
		Servers: []raft.Server{{
			ID:      "node-0",
			Address: t0.LocalAddr(),
		}},
	})
	if err := f.Error(); err != nil {
		panic(err)
	}

	// Wait for r0 to be leader.
	select {
	case isLeader := <-r0.LeaderCh():
		if !isLeader {
			panic("r0 wasn't leader")
		}
	case <-time.After(time.Second):
		panic("timed out waiting for r0 to be leader")
	}

	// Make two more and add them to the cluster.
	r1, t1, err := newRaft("node-1", syncStderr)
	if err != nil {
		panic(err)
	}
	r2, t2, err := newRaft("node-2", syncStderr)
	if err != nil {
		panic(err)
	}
	connectTransports(t0, t1, t2)
	f1 := r0.AddVoter("node-1", t1.LocalAddr(), 0, 0)
	f2 := r0.AddVoter("node-2", t2.LocalAddr(), 0, 0)
	if err := f1.Error(); err != nil {
		panic(err)
	}
	if err := f2.Error(); err != nil {
		panic(err)
	}

	// Shut the leader down.
	if err := r0.Shutdown().Error(); err != nil {
		panic(err)
	}
	if err := t0.Close(); err != nil {
		panic(err)
	}

	// Wait until one of the other nodes is leader.
	var newLeader *raft.Raft
	var name string
loop:
	for {
		select {
		case value := <-r1.LeaderCh():
			if value {
				newLeader = r1
				name = "node-1"
				break loop
			}
		case value := <-r2.LeaderCh():
			if value {
				newLeader = r2
				name = "node-2"
				break loop
			}
		case <-time.After(2 * time.Second):
			panic("timed out waiting for new leader")
		}
	}
	// If this sleep is omitted the hang doesn't happen.
	time.Sleep(2 * time.Second)

	// Try to shut the new leader down.
	fmt.Printf("**** shutting down new leader %s\n", name)
	disconnectTransports(t0, t1, t2)
	if err := newLeader.Shutdown().Error(); err != nil {
		panic(err)
	}
	fmt.Printf("**** new leader %s shut down successfully\n", name)
}

func newRaft(id raft.ServerID, output io.Writer) (*raft.Raft, *raft.InmemTransport, error) {
	_, transport := raft.NewInmemTransport("")

	store := raft.NewInmemStore()
	snapshotStore := raft.NewInmemSnapshotStore()
	config := raft.DefaultConfig()
	config.ShutdownOnRemove = false
	config.LocalID = id
	config.HeartbeatTimeout = 100 * time.Millisecond
	config.ElectionTimeout = config.HeartbeatTimeout
	config.LeaderLeaseTimeout = config.HeartbeatTimeout
	config.Logger = log.New(output, fmt.Sprintf("%s ", id), log.LstdFlags)

	if err := raft.ValidateConfig(config); err != nil {
		return nil, nil, err
	}

	r, err := raft.NewRaft(config, &nullFSM{}, store, store, snapshotStore, transport)
	if err != nil {
		transport.Close()
		return nil, nil, err
	}
	return r, transport, nil

}

// Connect the provided transport bidirectionally.
func connectTransports(transports ...raft.LoopbackTransport) {
	for _, t1 := range transports {
		for _, t2 := range transports {
			if t1 == t2 {
				continue
			}
			t1.Connect(t2.LocalAddr(), t2)
		}
	}
}

func disconnectTransports(transports ...raft.LoopbackTransport) {
	for _, t1 := range transports {
		for _, t2 := range transports {
			if t1 == t2 {
				continue
			}
			t1.Disconnect(t2.LocalAddr())
		}
	}
}

type nullFSM struct{}

func (m *nullFSM) Apply(log *raft.Log) interface{} {
	return nil
}

func (m *nullFSM) Snapshot() (raft.FSMSnapshot, error) {
	return &nullSnapshot{}, nil
}

func (m *nullFSM) Restore(rc io.ReadCloser) error {
	rc.Close()
	return nil
}

type nullSnapshot struct{}

func (s *nullSnapshot) Persist(sink raft.SnapshotSink) error {
	sink.Close()
	return nil
}

func (s *nullSnapshot) Release() {}

type syncWriter struct {
	mu     sync.Mutex
	target io.Writer
}

func (w *syncWriter) Write(data []byte) (int, error) {
	w.mu.Lock()
	defer w.mu.Unlock()
	return w.target.Write(data)
}

Thank you so much for bringing this to our attention. I'm going to close this issue since the changes have been merged! :)