Skip to content

Instantly share code, notes, and snippets.

@mikekamornikov
Last active May 5, 2022 08:16
Show Gist options
  • Save mikekamornikov/6c9b7b5eaeb7559176771efeafd8edff to your computer and use it in GitHub Desktop.
Save mikekamornikov/6c9b7b5eaeb7559176771efeafd8edff to your computer and use it in GitHub Desktop.
SQS producer/consumer local dev
package main
import "github.com/aws/aws-sdk-go/aws"
func GetAWSConfig() *aws.Config {
return &aws.Config{
Endpoint: aws.String("http://localhost:4100"),
Region: aws.String("us-west-1"),
}
}
package main
import (
"github.com/aws/aws-sdk-go/service/sqs"
"fmt"
"time"
"sync"
"github.com/aws/aws-sdk-go/aws"
)
type Message sqs.Message
type Handler func(id int, m Message) error
type consumer struct {
Client *sqs.SQS
WorkerPoolSize int
MaxMessages int
QueueURL *string
Handler Handler
}
func NewSQSConsumer(c *sqs.SQS, wps int, mm int, qURL *string, h Handler) *consumer {
return &consumer{
Client: c,
WorkerPoolSize: wps,
MaxMessages: mm,
QueueURL: qURL,
Handler: h,
}
}
func (c *consumer) Consume() {
for w := 1; w <= c.WorkerPoolSize; w++ {
go c.worker(w)
}
}
// Worker is responsible of its own longpolling and message handling.
// When a set of messages is received, a waitgroup is used to synchronize
// an additional set of goroutines to manage the message payload before
// querying again
func (c *consumer) worker(id int) {
for {
output, err := c.retrieveMessages()
if err != nil {
continue
}
fmt.Printf("↓ T: %18v, MC: %d.\n", time.Now().Format("15:04:05.999999999"), len(output.Messages))
var wg sync.WaitGroup
for _, m := range output.Messages {
wg.Add(1)
go func(m *Message) {
defer wg.Done()
if err := c.Handler(id, *m); err != nil {
// log error
return
}
// message consumed
c.delete(*m)
}((*Message)(m))
wg.Wait()
}
}
}
// Receive a message from queue with long polling enabled
func (c *consumer) retrieveMessages() (*sqs.ReceiveMessageOutput, error) {
var (
timeout int64 = 20
waitTime int64 = 20
)
return c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: c.QueueURL,
AttributeNames: aws.StringSlice([]string{
sqs.MessageSystemAttributeNameSentTimestamp,
}),
MaxNumberOfMessages: aws.Int64(int64(c.MaxMessages)),
MessageAttributeNames: aws.StringSlice([]string{
sqs.QueueAttributeNameAll,
}),
VisibilityTimeout: &timeout,
WaitTimeSeconds: &waitTime,
})
}
func (c *consumer) delete(m Message) (*sqs.DeleteMessageOutput, error) {
return c.Client.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: c.QueueURL,
ReceiptHandle: m.ReceiptHandle,
})
}
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"strconv"
"math/rand"
)
func main() {
const (
queueName = "users"
)
sess, err := session.NewSession(GetAWSConfig())
if err != nil {
fmt.Println("Error", err)
return
}
// Create a SQS service client.
svc := sqs.New(sess)
queueURLOutput, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
fmt.Println("Error", err)
return
}
queueURL := *queueURLOutput.QueueUrl
for counter := 1; counter <= 10000; counter++ {
// random operation types
operation := "Create"
if rand.Intn(10) < 5 {
operation = "Delete"
}
// user to json
user := "{id: " + strconv.Itoa(counter) + "}"
result, err := svc.SendMessage(&sqs.SendMessageInput{
MessageAttributes: map[string]*sqs.MessageAttributeValue{
"Operation": {
DataType: aws.String("String"),
StringValue: aws.String(operation),
},
},
MessageBody: aws.String(user),
QueueUrl: &queueURL,
})
if err != nil {
fmt.Println("Error", err)
return
}
fmt.Printf("↑ O: %6v, U: %11v, MID: %v\n", operation, user, *result.MessageId)
}
fmt.Println("DONE")
}
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"os"
"syscall"
"os/signal"
)
func main() {
const (
usersQueue = "users_to_mango"
)
sess, err := session.NewSession(GetAWSConfig())
if err != nil {
fmt.Println("Error", err)
return
}
// create a SQS service client
svc := sqs.New(sess)
// get SQS query url
queueURLOutput, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(usersQueue),
})
if err != nil {
fmt.Println("Error", err)
return
}
c := NewSQSConsumer(svc, 5, 10, queueURLOutput.QueueUrl, noopHandler)
c.Consume()
// create a channel to listen for SIGINT (Ctrl+C) to signal
// to our application to gracefully shutdown.
shutdown := make(chan os.Signal, 2)
signal.Notify(shutdown, syscall.SIGINT)
for {
select {
case <-shutdown:
// falling out of main
return
}
}
}
func noopHandler(id int, m Message) error {
operation := m.MessageAttributes["Operation"].StringValue
mangoURL := m.MessageAttributes["Mango"].StringValue
user := m.Body
fmt.Printf("↓ W: %d, O: %6v, M: %16v, U: %12v\n", id, *operation, *mangoURL, *user)
return nil
}
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"os"
"syscall"
"os/signal"
"strconv"
"math/rand"
)
func main() {
const (
usersQueue = "users"
)
sess, err := session.NewSession(GetAWSConfig())
if err != nil {
fmt.Println("Error", err)
return
}
// create a SQS service client
svc := sqs.New(sess)
// get SQS query url
queueURLOutput, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(usersQueue),
})
if err != nil {
fmt.Println("Error", err)
return
}
h, err := createHandler(svc)
if err != nil {
fmt.Println("Error", err)
return
}
c := NewSQSConsumer(svc, 5, 10, queueURLOutput.QueueUrl, *h)
c.Consume()
// create a channel to listen for SIGINT (Ctrl+C) to signal
// to our application to gracefully shutdown.
shutdown := make(chan os.Signal, 2)
signal.Notify(shutdown, syscall.SIGINT)
for {
select {
case <-shutdown:
// falling out of main
return
}
}
}
func createHandler(svc *sqs.SQS) (*Handler, error) {
const (
usersToMangoQueue = "users_to_mango"
)
queueURLOutput, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(usersToMangoQueue),
})
if err != nil {
return nil, err
}
queueURL := *queueURLOutput.QueueUrl
var h Handler = func(id int, m Message) error {
user := m.Body
operation := m.MessageAttributes["Operation"].StringValue
fmt.Printf("↓ W: %d, O: %6v, U: %12v\n", id, *operation, *user)
for counter := 1; counter <= rand.Intn(3); counter++ {
mangoURL := "https://mango_" + strconv.Itoa(counter)
result, err := svc.SendMessage(&sqs.SendMessageInput{
MessageAttributes: map[string]*sqs.MessageAttributeValue{
"Operation": {
DataType: aws.String("String"),
StringValue: aws.String(*operation),
},
"Mango": {
DataType: aws.String("String"),
StringValue: aws.String(mangoURL),
},
},
MessageBody: aws.String(*user),
QueueUrl: &queueURL,
})
if err != nil {
return err
}
fmt.Printf("↑ O: %6v, U: %12v, MID: %v\n", *operation, *user, *result.MessageId)
}
return nil
}
return &h, nil
}
@mikekamornikov
Copy link
Author

mikekamornikov commented Aug 16, 2018

brew install awscli

aws configure
# AWS Access Key ID [****************id]:
# AWS Secret Access Key [****************cret]:
# Default region name [us-west-1]:
# Default output format [None]:

docker run -ti --rm -p 4100:4100 pafortin/goaws
# {"level":"warning","msg":"Loading config file: /conf/goaws.yaml","time":"2018-08-16T16:51:31Z"}
# {"level":"warning","msg":"GoAws listening on: 0.0.0.0:4100","time":"2018-08-16T16:51:31Z"}


aws --endpoint-url http://localhost:4100 sqs list-queues
# {
#     "QueueUrls": [
#         "http://goaws:4100/queue/local-queue2",
#         "http://goaws:4100/queue/local-queue3",
#         "http://goaws:4100/queue/local-queue4",
#         "http://goaws:4100/queue/local-queue1"
#         "http://goaws:4100/queue/users",
#         "http://goaws:4100/queue/users_to_mango"
#     ]
# }

go run user_producer.go config.go sqs.go
go run user_with_mango_producer.go config.go sqs.go
go run user_with_mango_consumer.go config.go sqs.go

@hyhypepe
Copy link

Your gist is very awesome, it help me reduce my effort development time. Thank you very much.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment