/pubsubToolBox

Google Cloud Platform Pub/Sub toolbox

Primary LanguageGoBSD 2-Clause "Simplified" LicenseBSD-2-Clause

pubsubToolBox

Google Cloud Platform Pub/Sub toolbox

Prerequisities

  • GCP project with a Pub/Sub up & running.
  • GCP service account and the credential file private_key.json associated.
  • GCP root package "cloud.google.com/go" (go get -u cloud.google.com/go/pubsub)

Usage

go get github.com/PierreKieffer/pubsubToolBox

Publisher

To publish messages :

import (
        "context"
        "github.com/PierreKieffer/pubsubToolBox/client"
        "github.com/PierreKieffer/pubsubToolBox/publisher"
        "log"
)

func main() {

        projectID := "PROJECT_ID"

        // If GOOGLE_APPLICATION_CREDENTIALS is set :
        pubsubClient, err := client.InitPubSubClient(ctx, projectID)

        // Else you need to pass the json key                                                                         
        pubsubClient, err := client.InitPubSubClient(ctx, projectID, "private_key.json")
        if err != nil {
                log.Println(err)
        }   

	p := publisher.Publisher{
		Context : context.Background(),
		TopicID : "TOPIC_ID",
		PubSubClient : pubsubClient
	}

        message := `{"Message" : "Hello world"}`
	attributes := map[string]string{"foo" : "bar"}
        p.Publish(message, attributes)
}

Consumer

To consume messages :

A message buffer must be instantiated in order to store messages at the application level.

The consumer.Pull method receives messages from the Pub/Sub broker and adds them to the message buffer.

The message structure is :

type Message struct {
	Data       string
	Attributes map[string]string
}

This allows consumption of the Pub/Sub broker and processing of messages in parallel.

The initialization takes the size of the buffer as a parameter :

var buffer = consumer.InitBuffer(10)

Once a message is consumed, it is dropped from the buffer.

Consume messages from Pub/Sub broker :

If the subscriber doesn't exist, it will be created.

Instanciate first a consumer.Consumer :

	c := &consumer.Consumer{
		Context:        context.Background(),
		PubSubClient:   pubsubClient,
		SubscriberName: "subscriber-name",
		TopicID:        "topic-id",
		Buffer:         buffer,
	}

Start message consumption :

	go c.Pull()

The message buffer is a classic channel, It can be consumed through a goroutine :

package main

import (
	"context"
	"log"

	"github.com/PierreKieffer/pubsubToolBox/client"
	"github.com/PierreKieffer/pubsubToolBox/consumer"
)

var exit = make(chan bool)

func main() {

	projectID := "test-project-name"

	pubsubClient, _ := client.InitPubSubClient(ctx, projectID, "private_key.json")

	// Init message buffer to receive pulled messages
	var buffer = consumer.InitBuffer(10)

	c := &consumer.Consumer{
		Context:        context.Background(),
		PubSubClient:   pubsubClient,
		SubscriberName: "subscriber-name",
		TopicID:        "topic-id",
		Buffer:         buffer,
	}

	// Launch local buffer consumer to process messages
	go ProcessBuffer(buffer)

	// Launch the pubsub consumer to pull messages
	go c.Pull()

	<-exit
}

func ProcessBuffer(messageBuffer chan consumer.Message) {
	for {
		// ... Process received messages
		log.Println("Message consumed : ", <-messageBuffer)
	}
}