Created
May 2, 2025 08:47
-
-
Save itgelo/d8055ffb83510af9aee66948b39133d6 to your computer and use it in GitHub Desktop.
Server-Sent Events with Fiber
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"encoding/json" | |
"fmt" | |
"log" | |
"math/rand" | |
"runtime" | |
"strconv" | |
"sync" | |
"time" | |
"github.com/gofiber/fiber/v2" | |
"github.com/gofiber/fiber/v2/middleware/cors" | |
"github.com/gofiber/fiber/v2/middleware/recover" | |
"github.com/valyala/fasthttp" | |
) | |
const ( | |
keepAliveInterval = 15 * time.Second | |
) | |
type session struct { | |
val float64 | |
stateChannel chan float64 | |
} | |
type sessionsLock struct { | |
sync.Mutex | |
sessions []*session | |
} | |
func (sl *sessionsLock) addSession(s *session) { | |
sl.Lock() | |
defer sl.Unlock() | |
sl.sessions = append(sl.sessions, s) | |
} | |
func (sl *sessionsLock) removeSession(s *session) { | |
sl.Lock() | |
defer sl.Unlock() | |
for i, sess := range sl.sessions { | |
if sess == s { | |
// Fast removal without preserving order | |
sl.sessions[i] = sl.sessions[len(sl.sessions)-1] | |
sl.sessions = sl.sessions[:len(sl.sessions)-1] | |
return | |
} | |
} | |
} | |
func (sl *sessionsLock) broadcastUpdate() { | |
sl.Lock() | |
defer sl.Unlock() | |
var wg sync.WaitGroup | |
wg.Add(len(sl.sessions)) | |
r := rand.New(rand.NewSource(time.Now().UnixNano())) | |
for _, s := range sl.sessions { | |
go func(cs *session) { | |
defer wg.Done() | |
select { | |
case cs.stateChannel <- cs.val + (r.Float64() * 100): | |
default: // Skip if channel is full | |
} | |
}(s) | |
} | |
wg.Wait() | |
} | |
func formatSSEMessage(eventType string, data any) (string, error) { | |
var buf bytes.Buffer | |
if err := json.NewEncoder(&buf).Encode(map[string]any{"data": data}); err != nil { | |
return "", err | |
} | |
return fmt.Sprintf("event: %s\nretry: %d\ndata: %v\n\n", | |
eventType, | |
keepAliveInterval/time.Millisecond, | |
buf.String()), nil | |
} | |
func bToMb(b uint64) uint64 { | |
return b >> 20 // Equivalent to b / 1024 / 1024 but faster | |
} | |
var currentSessions sessionsLock | |
func main() { | |
app := fiber.New(fiber.Config{ | |
DisableStartupMessage: true, | |
}) | |
app.Use(recover.New(), cors.New()) | |
// Health endpoints | |
app.Get("/health", func(c *fiber.Ctx) error { | |
return c.Send(nil) | |
}) | |
app.Get("/connections", func(c *fiber.Ctx) error { | |
currentSessions.Lock() | |
defer currentSessions.Unlock() | |
return c.JSON(map[string]any{ | |
"open-connections": app.Server().GetOpenConnectionsCount(), | |
"sessions": len(currentSessions.sessions), | |
}) | |
}) | |
app.Get("/infos", func(c *fiber.Ctx) error { | |
var m runtime.MemStats | |
runtime.ReadMemStats(&m) | |
return c.JSON(map[string]any{ | |
"Alloc": bToMb(m.Alloc), | |
"TotalAlloc": bToMb(m.TotalAlloc), | |
"tSys": bToMb(m.Sys), | |
"tNumGC": m.NumGC, | |
"goroutines": runtime.NumGoroutine(), | |
}) | |
}) | |
app.Get("/queues/stream", func(c *fiber.Ctx) error { | |
c.Set("Content-Type", "text/event-stream") | |
c.Set("Cache-Control", "no-cache") | |
c.Set("Connection", "keep-alive") | |
c.Set("Transfer-Encoding", "chunked") | |
val, err := strconv.ParseFloat(c.Query("query"), 64) | |
if err != nil { | |
val = 0 | |
} | |
stateChan := make(chan float64, 1) | |
s := &session{val: val, stateChannel: stateChan} | |
currentSessions.addSession(s) | |
notify := c.Context().Done() | |
c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { | |
defer func() { | |
currentSessions.removeSession(s) | |
log.Println("Exiting stream") | |
}() | |
keepAlive := time.NewTicker(keepAliveInterval) | |
defer keepAlive.Stop() | |
// Initial welcome message | |
if _, err := w.WriteString("data: connected\n\n"); err != nil { | |
return | |
} | |
w.Flush() | |
for { | |
select { | |
case <-notify: | |
return | |
case ev := <-stateChan: | |
sseMsg, err := formatSSEMessage("current-value", ev) | |
if err != nil { | |
log.Printf("Error formatting SSE: %v", err) | |
continue | |
} | |
if _, err := w.WriteString(sseMsg); err != nil { | |
return | |
} | |
case <-keepAlive.C: | |
if _, err := w.WriteString(":keepalive\n"); err != nil { | |
return | |
} | |
} | |
if err := w.Flush(); err != nil { | |
return | |
} | |
} | |
})) | |
return nil | |
}) | |
app.Post("/queues/action", func(c *fiber.Ctx) error { | |
currentSessions.broadcastUpdate() | |
return c.JSON(map[string]string{"status": "update triggered"}) | |
}) | |
log.Println("Server starting on :8080") | |
if err := app.Listen(":8080"); err != nil { | |
log.Panicf("Server failed: %v", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment