Skip to content

Instantly share code, notes, and snippets.

@JyotinderSingh
Created October 12, 2024 15:02
Show Gist options
  • Save JyotinderSingh/9fe75b171c2a954f3fd39dd80e9b554a to your computer and use it in GitHub Desktop.
Save JyotinderSingh/9fe75b171c2a954f3fd39dd80e9b554a to your computer and use it in GitHub Desktop.
leaderboard
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"sync"
"time"
dicedb "github.com/dicedb/go-dice"
"github.com/gorilla/websocket"
)
var (
dice *dicedb.Client
upgrade = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
type LeaderboardEntry struct {
PlayerID string `json:"player_id"`
Score int `json:"score"`
Timestamp time.Time `json:"timestamp"`
}
func main() {
time.Sleep(2 * time.Second)
dhost := "localhost"
if val := os.Getenv("DICEDB_HOST"); val != "" {
dhost = val
}
dport := "7379"
if val := os.Getenv("DICEDB_PORT"); val != "" {
dport = val
}
dice = dicedb.NewClient(&dicedb.Options{
Addr: fmt.Sprintf("%s:%s", dhost, dport),
DialTimeout: 10 * time.Second,
MaxRetries: 10,
})
go updateScores()
go watchLeaderboard()
// Serve static files for the frontend
http.Handle("/", http.FileServer(http.Dir(".")))
http.HandleFunc("/ws", handleWebSocket)
log.Println("Leaderboard running on http://localhost:8000, please open it in your favourite browser.")
log.Fatal(http.ListenAndServe(":8000", nil))
}
func updateScores() {
ctx := context.Background()
for {
entry := LeaderboardEntry{
PlayerID: fmt.Sprintf("player:%d", rand.Intn(10)),
Score: rand.Intn(100),
Timestamp: time.Now(),
}
// Store the player data
lentry, _ := json.Marshal(entry)
dice.JSONSet(ctx, entry.PlayerID, "$", lentry).Err()
// Update the sorted set
dice.ZAdd(ctx, "leaderboard", &dicedb.Z{
Score: float64(entry.Score),
Member: entry.PlayerID,
}).Err()
}
}
func watchLeaderboard() {
ctx := context.Background()
conn := dice.WatchConn(ctx)
defer conn.Close()
// Watch the top 10 players in descending order
_, err := conn.ZRangeWatch(ctx, "leaderboard", "0", "9", "REV", "WITHSCORES")
if err != nil {
log.Printf("error in ZRangeWatch: %v", err)
return
}
ch := conn.Channel()
for {
select {
case msg := <-ch:
if msg == nil {
return
}
zs, ok := msg.Data.([]dicedb.Z)
if !ok {
log.Printf("unexpected data type in ZRangeWatch message")
continue
}
// Collect player IDs with score > 10
var playerIDs []string
for _, z := range zs {
score := int(z.Score)
if score <= 10 {
continue
}
playerID := z.Member.(string)
playerIDs = append(playerIDs, playerID)
if len(playerIDs) >= 5 {
break
}
}
// Fetch player data for these IDs using pipeline
pipe := dice.Pipeline()
var cmds []*dicedb.Cmd
for _, playerID := range playerIDs {
cmd := pipe.JSONGet(ctx, playerID, "$")
cmds = append(cmds, cmd)
}
_, err := pipe.Exec(ctx)
if err != nil {
log.Printf("error executing pipeline: %v", err)
continue
}
// Collect entries
entries := make([]LeaderboardEntry, 0, len(playerIDs))
for _, cmd := range cmds {
playerData, err := cmd.Result()
if err != nil {
log.Printf("error getting player data: %v", err)
continue
}
var entry LeaderboardEntry
json.Unmarshal([]byte(playerData.(string)), &entry)
entries = append(entries, entry)
}
// Now broadcast entries
broadcast(entries)
case <-ctx.Done():
return
}
}
}
func broadcast(entries []LeaderboardEntry) {
cMux.Lock()
defer cMux.Unlock()
message, _ := json.Marshal(entries)
for client := range clients {
client.WriteMessage(websocket.TextMessage, message)
}
}
var (
clients = make(map[*websocket.Conn]bool)
cMux = &sync.Mutex{}
)
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
log.Printf("error upgrading to WebSocket: %v", err)
return
}
defer func(conn *websocket.Conn) {
err := conn.Close()
if err != nil {
log.Printf("error closing WebSocket connection: %v", err)
}
}(conn)
cMux.Lock()
clients[conn] = true
cMux.Unlock()
for {
_, _, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
}
cMux.Lock()
delete(clients, conn)
cMux.Unlock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment