Last active
January 23, 2022 08:51
-
-
Save vitan/aedb628a40478cf8b6a33dc87a5ff52f to your computer and use it in GitHub Desktop.
PriorityQueue in golang, to support multiple producers and multiple consumers. with example how to use it
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"fmt" | |
"math" | |
"reflect" | |
"sync" | |
) | |
const ( | |
ITEM_COUNT = 5 | |
EMPTY_VAL = math.MaxInt64 | |
ERROR_QUEUE_CLOSED = "error-closed-queue" | |
) | |
// Priority Queue Implement | |
type PriorityQueue struct { | |
queues []chan int | |
capacity int | |
opening_q_counts int | |
mutex *sync.Mutex | |
} | |
func (pQ *PriorityQueue) NewPriorityQueue(prioritys int, capacity int) *PriorityQueue { | |
pQ.queues = []chan int{} | |
pQ.capacity = capacity | |
pQ.opening_q_counts = prioritys | |
pQ.mutex = &sync.Mutex{} | |
for i := 0; i < prioritys; i++ { | |
pQ.queues = append(pQ.queues, make(chan int, capacity)) | |
} | |
return pQ | |
} | |
func (pQ *PriorityQueue) Enqueue(priority int, val int) error { | |
if priority >= len(pQ.queues) || priority < 0 { | |
return errors.New("out of index") | |
} | |
idx := len(pQ.queues) - priority - 1 | |
pQ.queues[idx] <- val | |
return nil | |
} | |
func (pQ *PriorityQueue) Dequeue() (int, error) { | |
cases := make([]reflect.SelectCase, len(pQ.queues)) | |
for i, q := range pQ.queues { | |
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(q)} | |
} | |
for pQ.opening_q_counts > 0 { | |
chosen, value, ok := reflect.Select(cases) | |
if !ok { | |
cases[chosen].Chan = reflect.ValueOf(nil) | |
pQ.mutex.Lock() | |
pQ.opening_q_counts -= 1 | |
pQ.mutex.Unlock() | |
} else { | |
return int(value.Int()), nil | |
} | |
} | |
return EMPTY_VAL, errors.New(ERROR_QUEUE_CLOSED) | |
} | |
// Producer&Consumer avatar | |
func producer(wg *sync.WaitGroup, priority int, pQ *PriorityQueue) { | |
defer wg.Done() | |
for i := 0; i < ITEM_COUNT; i++ { | |
//(wtzhou) Q: why priority*10+i? | |
// A: make consumer output readable. change me if needed | |
value := priority*10 + i | |
if err := pQ.Enqueue(priority, value); err != nil { | |
fmt.Printf("ERROR: %s\n", err.Error()) | |
} | |
fmt.Printf("Produced item: %d on priority %d\n", i, priority) | |
} | |
} | |
func consumer(wg *sync.WaitGroup, pQ *PriorityQueue) { | |
defer wg.Done() | |
for { | |
val, err := pQ.Dequeue() | |
if err != nil { | |
if err.Error() == ERROR_QUEUE_CLOSED { | |
break | |
} | |
} else { | |
fmt.Printf("Dequeue value: %d\n", val) | |
} | |
} | |
} | |
// Sample: produce some value to different priority | |
func SpawnProducer(wg *sync.WaitGroup, pQ *PriorityQueue) { | |
for i := 0; i < 8; i++ { | |
wg.Add(1) | |
go producer(wg, i, pQ) | |
} | |
} | |
// Sample: consume some value | |
func SpawnConsumer(wg *sync.WaitGroup, pQ *PriorityQueue) { | |
wg.Add(1) | |
go consumer(wg, pQ) | |
wg.Add(1) | |
go consumer(wg, pQ) | |
} | |
func main() { | |
fmt.Println("Starting Producer, Consumer") | |
pQ := &PriorityQueue{} | |
pQ = pQ.NewPriorityQueue(10, 10) | |
producer_wg := &sync.WaitGroup{} | |
SpawnProducer(producer_wg, pQ) | |
producer_wg.Wait() | |
// close all the queue | |
for _, q := range pQ.queues { | |
close(q) | |
} | |
consumer_wg := &sync.WaitGroup{} | |
SpawnConsumer(consumer_wg, pQ) | |
consumer_wg.Wait() | |
fmt.Println("Exited successfully") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment