joncrlsn/dque

invalid memory address or nil pointer dereference on dequeue

dllz opened this issue · 10 comments

dllz commented

Hey, just started using this library and I have run into an issue dequeuing a queue.

[signal SIGSEGV: segmentation violation code=0x1 addr=0x58 pc=0x181f8b1]

goroutine 10 [running]:
sync.(*Mutex).Lock(...)
	/usr/local/Cellar/go/1.13.6/libexec/src/sync/mutex.go:74
github.com/joncrlsn/dque.(*DQue).Dequeue(0x0, 0x0, 0x0, 0x0, 0x0)
	/Users/dlotz/go/pkg/mod/github.com/joncrlsn/dque@v0.0.0-20200308203223-fe4f81ffd1dc/queue.go:240 +0x51

I am not sure if you have any thoughts on the cause of this, my googling does not seem to turn anything up.  

Hi, is this something you can recreate at will? That would help.

@Kriechi

dllz commented

I can only get this error, I do not seem able to successfully dequeue.
The program starts up, all the queues are created.
Then I have set up a blocking pop on the queue along the lines of:

Try dequeue, if nothing in the queue sleep for 0.5 seconds and then try dequeue again. Repeat till you can get something off the queue.

And I immediately get that error and the program crashes

Until we get this figured out, can you use the v2.1 tag? I'm at work today and won't be able to try until later this evening.

dllz commented

No problem, if it helps debuging I am using go 1.14 but I downgraded to 1.12 and the issue persisted

@dllz would you mind sharing a snippet of your code?
How do you open the queue? Maybe you even got a minimal reproducible example that shows the error? Or is it deep-down in your application?

dllz commented

Happy to share some, I tried a bunch of other releases and I ran into the issue so I think this is very much a me being stupid.

type Dq struct {
	low  *dque.DQue
	med  *dque.DQue
	high *dque.DQue
	now  *dque.DQue
}

type Holder struct {
	Item interface{}
	Time int64
	Id   string
}

func HolderBuilder() interface{} {
	return &Holder{}
}

func (d Dq) Push(id string, item interface{}, priority Priority) error {
	holder := Holder{
		Item: item,
		Time: time.Now().Unix(),
	}
	switch priority {
	case Now:
		err := d.now.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case High:
		err := d.high.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Medium:
		err := d.med.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Low:
		err := d.low.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	default:
		return util.HandleErrorErr("No matching priority", errors.New("No matching priority for "+string(priority)))
	}
	return nil
}

func (d Dq) Pop() interface{} {
	for {
		temp, err := d.now.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching now priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.high.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching high priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.med.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching med priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.low.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching low priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		time.Sleep(time.Millisecond * time.Duration(viper.GetInt(util.QueueDequeueEmptyWaitTime)))
	}
}

func (d Dq) Connect() interface{} {
	var err error
	d.low, err = dque.New(string(Low), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Low), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating low client", err)
			return nil
		}
	}
	d.med, err = dque.New(string(Medium), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Medium), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating med client", err)
			return nil
		}
	}
	d.high, err = dque.New(string(High), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.high, err = dque.Open(string(High), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating high client", err)
			return nil
		}
	}
	d.now, err = dque.New(string(Now), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Now), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating now client", err)
			return nil
		}
	}
	return d
}

func (d Dq) ListenOnQueue(handler func(item interface{}), stop *bool) {
	go func() {
		for {
			if !*stop {
				item := d.Pop()
				handler(item)
			}
		}
	}()
}

And then it is being initialized with

        dq := queue.Dq{}
	qh := QueueHandler{
		Queue: dq,
	}
	dq.Connect()
	var stop bool
	dq.ListenOnQueue(func(item interface{}) {
		fmt.Println(item)
	}, &stop)

Quick one without thinking too much:

you might want to try using "methods on pointers":

func (d Dq) Connect() interface{} {

vs.

func (d *Dq) Connect() interface{} {

Also, dque has a nice NewOrOpen which might save you a bit of error handling code.

dllz commented

Issue persists with pointers sadly,
Thanks for the NewOrOpen, that will definitely come in handy

Hi dllz, Nothing jumps out at me right away either. Could you maybe change your comment above to be the contents of a stand-alone go file that can be run to show the problem? Then I can copy it into test.go and run go run test.go. For example,

  • I can't find the definition of QueueHandler.
  • I can't tell how stop ever gets flipped.
  • Please hardcode values like this to make it compile without extra dependencies: viper.GetString(util.QueueDirectory)
  • Make sure your example uses methods on the pointer of Dq like Kriechi suggested.

Whenever I have had to show a problem with the simplest possible problem demonstration program on any code I've written, I find it really helps my understanding of things, as well as helping others help me find the problem.

Thanks!

  • Jon
dllz commented

Hi Jon.

Sorry about that, I have cleaned up the code to hopefully solve your problem.

package queue

import (
	"errors"
	"fmt"
	"github.com/joncrlsn/dque"
	"time"
)

type Priority string

const (
	Low         Priority = "low"
	Medium      Priority = "med"
	High        Priority = "high"
	Now         Priority = "now"
	SegmentSize          = 50
)


type Dq struct {
	low  *dque.DQue
	med  *dque.DQue
	high *dque.DQue
	now  *dque.DQue
}

type Holder struct {
	Item interface{}
	Time int64
	Id   string
}

func HolderBuilder() interface{} {
	return &Holder{}
}

func (d *Dq) Push(id string, item interface{}, priority Priority) error {
	holder := Holder{
		Item: item,
		Time: time.Now().Unix(),
	}
	switch priority {
	case Now:
		err := d.now.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case High:
		err := d.high.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Medium:
		err := d.med.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Low:
		err := d.low.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	default:
		fmt.Println("No matching priority")
		return errors.New("No matching priority for "+string(priority))
	}
	return nil
}

func (d *Dq) Pop() interface{} {
	for {
		temp, err := d.now.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching now priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.high.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching high priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.med.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching med priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.low.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching low priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		time.Sleep(time.Millisecond * 500)
	}
}

func (d *Dq) Connect() interface{} {
	var err error
	d.low, err = dque.New(string(Low), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Low), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating low client", err)
			return nil
		}
	}
	d.med, err = dque.New(string(Medium), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Medium), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating med client", err)
			return nil
		}
	}
	d.high, err = dque.New(string(High), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.high, err = dque.Open(string(High), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating high client", err)
			return nil
		}
	}
	d.now, err = dque.New(string(Now), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Now), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating now client", err)
			return nil
		}
	}
	return d
}

func (d *Dq) ListenOnQueue(handler func(item interface{}), stop *bool) {
	go func() {
		for {
			if !*stop {
				item := d.Pop()
				handler(item)
			}
		}
	}()
}

The calling code:

package webservice

import (
	"canvas-data-management-go/pkg/export"
	"canvas-data-management-go/pkg/queue"
	"canvas-data-management-go/pkg/util"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
)

type QueueJson struct {
	Item     string         `json:item`
	Id       string         `json:id`
	Priority queue.Priority `json:Priority`
}

type QueueHandler struct {
	Queue queue.Dq
}
func Run(ip string, port string) error {
	dq := queue.Dq{}
	qh := QueueHandler{
		Queue: dq,
	}
	dq.Connect()

	http.HandleFunc("/queue", qh.handleQueue)
	hostAddress := "localhost" + ":" + "12345"
	var stop bool
	dq.ListenOnQueue(func(item interface{}) {
		fmt.Println(item)
	}, &stop)
		err := http.ListenAndServe(hostAddress, nil)
	if err != nil{
		fmt.Println("Error hosting http listener "+hostAddress)
		return err
	}
	return nil
}
func (qh *QueueHandler) handleQueue(w http.ResponseWriter, r *http.Request) {
	fmt.Println("Received queue request")
	var item QueueJson
	err := json.Unmarshal(getBody(r.Body, w), &item)
	if err != nil {
		fmt.Println("Error un-marshalling json from http server", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}
	err = qh.Queue.Push(item.Id, item.Item, item.Priority)
	if err != nil {
		fmt.Println("Error queuing data: "+item.Id, err)
		http.Error(w, "Error queuing data for ID: "+item.Id+" "+err.Error(), http.StatusInternalServerError)
		return
	}
	response := ResponseJson{Message: "Successfully processed"}
	encoder := json.NewEncoder(w)
	err = encoder.Encode(response)
}

Webservice caller

package cmd

import (
	ws "canvas-data-management-go/pkg/webservice"
	"fmt"
	"github.com/spf13/cobra"
)

var (
	webserviceCommand = &cobra.Command{
		Use:   "web",
		Short: "Put the cli a mode that it receives commands from the internet",
		Run: func(cmd *cobra.Command, args []string) {
			err := webservice(ip, port)
			if err != nil {
				fmt.Print("Error running command", err)
			}
		},
	}
	port string
	ip   string
)

func webservice(ip string, port string) error {
	err := ws.Run()
	if err != nil {
		return err
	}
	return nil
}

root.go

package cmd

import (
	"github.com/spf13/cobra"
)

var (
	rootCmd = &cobra.Command{
		Use:              "cdm",
		TraverseChildren: true,
	}
)

func Execute() error {
	return rootCmd.Execute()
}

func init() {

	//Web
	webserviceCommand.Flags().StringVar(&ip, "ip", "127.0.0.1", "Specify the ip address to listen on")
	webserviceCommand.Flags().StringVar(&port, "port", "2364", "Specify the ip address to listen on")
	rootCmd.AddCommand(webserviceCommand)
}

main.go

package main

import (
	"canvas-data-management-go/cmd"
	"fmt"
)

func main() {
	err := cmd.Execute()
	if err != nil {
		fmt.Println("Error starting up", err)
	}
}