WebSockets
Enable real-time bidirectional communication with Velocity's WebSocket server and client APIs.
Velocity provides a powerful WebSocket implementation for real-time, bidirectional communication between your application and clients.
Quick Start
Setting up WebSockets is straightforward:
import (
"github.com/velocitykode/velocity/pkg/websocket"
"github.com/velocitykode/velocity/pkg/log"
)
func main() {
// Create WebSocket server with default configuration
config := websocket.DefaultConfig()
config.Port = 6001
config.Path = "/ws"
server := websocket.NewServer(config)
// Handle new connections
server.OnConnect(func(client *websocket.Client) {
log.Info("Client connected", "client_id", client.ID())
// Send welcome message
client.Send("welcome", map[string]interface{}{
"message": "Welcome to our WebSocket server!",
"client_id": client.ID(),
})
})
// Handle disconnections
server.OnDisconnect(func(client *websocket.Client) {
log.Info("Client disconnected", "client_id", client.ID())
})
// Start the WebSocket server
go server.Start()
// Your HTTP server continues to run
http.ListenAndServe(":4000", router.Get())
}Configuration
Server Configuration
config := websocket.Config{
Port: 3000,
Path: "/ws",
AllowedOrigins: []string{"*"},
EnableCompression: true,
HandshakeTimeout: 10 * time.Second,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// Custom origin checking logic
return true
},
}
server := websocket.NewServer(config)Environment Configuration
Configure WebSocket settings in your .env file:
# WebSocket Server
WS_PORT=6001
WS_PATH=/ws
WS_ALLOWED_ORIGINS=*
WS_ENABLE_COMPRESSION=true
WS_HANDSHAKE_TIMEOUT=10s
WS_READ_BUFFER_SIZE=1024
WS_WRITE_BUFFER_SIZE=1024Client Management
Client Information
server.OnConnect(func(client *websocket.Client) {
// Get client information
clientID := client.ID()
remoteAddr := client.RemoteAddr()
userAgent := client.UserAgent()
log.Info("New client connected",
"client_id", clientID,
"remote_addr", remoteAddr,
"user_agent", userAgent,
)
})Client Collections
// Get all connected clients
clients := server.GetClients()
log.Info("Total connected clients", "count", len(clients))
// Get specific client by ID
client := server.GetClient("client-123")
if client != nil {
client.Send("ping", map[string]interface{}{
"timestamp": time.Now().Unix(),
})
}
// Check if client is connected
if server.IsConnected("client-123") {
log.Info("Client is still connected")
}Message Handling
Receiving Messages
server.OnMessage(func(client *websocket.Client, message websocket.Message) {
log.Info("Received message",
"client_id", client.ID(),
"event", message.Event,
"data", message.Data,
)
switch message.Event {
case "chat_message":
handleChatMessage(client, message.Data)
case "join_room":
handleJoinRoom(client, message.Data)
case "ping":
client.Send("pong", map[string]interface{}{
"timestamp": time.Now().Unix(),
})
default:
log.Warn("Unknown message event", "event", message.Event)
}
})
func handleChatMessage(client *websocket.Client, data interface{}) {
messageData := data.(map[string]interface{})
text := messageData["text"].(string)
room := messageData["room"].(string)
// Broadcast message to all clients in the room
server.ToRoom(room).Send("new_message", map[string]interface{}{
"text": text,
"user": client.ID(),
"timestamp": time.Now().Unix(),
})
}Sending Messages
// Send to specific client
client.Send("notification", map[string]interface{}{
"title": "New Message",
"body": "You have received a new message",
"timestamp": time.Now().Unix(),
})
// Send to all connected clients
server.Broadcast("server_announcement", map[string]interface{}{
"message": "Server maintenance in 5 minutes",
"type": "warning",
})
// Send to multiple clients
clientIDs := []string{"client-1", "client-2", "client-3"}
server.ToClients(clientIDs).Send("group_message", map[string]interface{}{
"message": "Hello group!",
})Room Management
Joining and Leaving Rooms
// Client joins a room
server.OnMessage(func(client *websocket.Client, message websocket.Message) {
if message.Event == "join_room" {
roomName := message.Data.(map[string]interface{})["room"].(string)
// Add client to room
server.JoinRoom(client.ID(), roomName)
// Notify other room members
server.ToRoom(roomName).Except(client.ID()).Send("user_joined", map[string]interface{}{
"user_id": client.ID(),
"room": roomName,
"message": fmt.Sprintf("User %s joined the room", client.ID()),
})
// Send confirmation to the client
client.Send("room_joined", map[string]interface{}{
"room": roomName,
"message": "Successfully joined the room",
})
}
})
// Client leaves a room
server.OnMessage(func(client *websocket.Client, message websocket.Message) {
if message.Event == "leave_room" {
roomName := message.Data.(map[string]interface{})["room"].(string)
// Remove client from room
server.LeaveRoom(client.ID(), roomName)
// Notify other room members
server.ToRoom(roomName).Send("user_left", map[string]interface{}{
"user_id": client.ID(),
"room": roomName,
"message": fmt.Sprintf("User %s left the room", client.ID()),
})
}
})Room Broadcasting
// Send message to all clients in a room
server.ToRoom("general").Send("room_message", map[string]interface{}{
"text": "Hello everyone in the general room!",
"from": "system",
})
// Send to room except specific clients
server.ToRoom("general").Except("client-123").Send("message", data)
// Send to multiple rooms
server.ToRooms([]string{"room1", "room2"}).Send("announcement", data)
// Get room information
roomClients := server.GetRoomClients("general")
log.Info("Clients in general room", "count", len(roomClients))Error Handling
Connection Errors
server.OnError(func(client *websocket.Client, err error) {
log.Error("WebSocket error",
"client_id", client.ID(),
"error", err,
)
// Optionally disconnect the client
if isServerError(err) {
client.Disconnect()
}
})
func isServerError(err error) bool {
// Determine if error requires disconnection
return strings.Contains(err.Error(), "server error")
}Message Validation
server.OnMessage(func(client *websocket.Client, message websocket.Message) {
// Validate message structure
if message.Event == "" {
client.Send("error", map[string]interface{}{
"message": "Event name is required",
"code": "INVALID_EVENT",
})
return
}
// Validate message data
if message.Data == nil {
client.Send("error", map[string]interface{}{
"message": "Message data is required",
"code": "INVALID_DATA",
})
return
}
// Process valid message
handleValidMessage(client, message)
})Authentication
Connection Authentication
server.OnConnect(func(client *websocket.Client) {
// Check for authentication token
token := client.Request().URL.Query().Get("token")
if token == "" {
client.Send("auth_required", map[string]interface{}{
"message": "Authentication token required",
})
client.Disconnect()
return
}
// Validate token
user, err := validateAuthToken(token)
if err != nil {
client.Send("auth_failed", map[string]interface{}{
"message": "Invalid authentication token",
})
client.Disconnect()
return
}
// Store user information with client
client.SetData("user", user)
client.SetData("authenticated", true)
log.Info("Authenticated client connected",
"client_id", client.ID(),
"user_id", user.ID,
)
})Message-based Authentication
server.OnMessage(func(client *websocket.Client, message websocket.Message) {
// Check if client is authenticated for protected events
if isProtectedEvent(message.Event) {
authenticated := client.GetData("authenticated")
if authenticated != true {
client.Send("unauthorized", map[string]interface{}{
"message": "Authentication required for this action",
"event": message.Event,
})
return
}
}
// Process authenticated message
handleMessage(client, message)
})Performance & Monitoring
Connection Statistics
// Get server statistics
stats := server.GetStats()
log.Info("WebSocket server stats",
"connected_clients", stats.ConnectedClients,
"total_messages_sent", stats.MessagesSent,
"total_messages_received", stats.MessagesReceived,
"bytes_sent", stats.BytesSent,
"bytes_received", stats.BytesReceived,
)
// Monitor connection health
server.OnConnect(func(client *websocket.Client) {
// Set up periodic ping
ticker := time.NewTicker(30 * time.Second)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !client.IsConnected() {
return
}
client.Ping()
}
}
}()
})Rate Limiting
// Implement rate limiting per client
server.OnMessage(func(client *websocket.Client, message websocket.Message) {
// Get rate limiter for client
limiter := getRateLimiter(client.ID())
if !limiter.Allow() {
client.Send("rate_limit_exceeded", map[string]interface{}{
"message": "Too many messages, please slow down",
"retry_after": time.Second * 5,
})
return
}
// Process message if rate limit allows
handleMessage(client, message)
})
func getRateLimiter(clientID string) *rate.Limiter {
// Implementation depends on your rate limiting strategy
// Return appropriate rate limiter for client
return rate.NewLimiter(rate.Limit(10), 10) // 10 messages per second
}Frontend Integration
JavaScript Client
// Connect to WebSocket server
const ws = new WebSocket('ws://localhost:6001/ws');
// Handle connection open
ws.onopen = function(event) {
console.log('Connected to WebSocket server');
// Send authentication
ws.send(JSON.stringify({
event: 'authenticate',
data: {
token: localStorage.getItem('auth_token')
}
}));
};
// Handle incoming messages
ws.onmessage = function(event) {
const message = JSON.parse(event.data);
console.log('Received message:', message);
switch(message.event) {
case 'welcome':
handleWelcome(message.data);
break;
case 'new_message':
displayMessage(message.data);
break;
case 'user_joined':
showUserJoined(message.data);
break;
default:
console.log('Unknown message event:', message.event);
}
};
// Send message
function sendMessage(event, data) {
ws.send(JSON.stringify({
event: event,
data: data
}));
}
// Join a room
function joinRoom(roomName) {
sendMessage('join_room', { room: roomName });
}
// Send chat message
function sendChatMessage(text, room) {
sendMessage('chat_message', {
text: text,
room: room
});
}Testing WebSockets
func TestWebSocketServer(t *testing.T) {
// Create test server
config := websocket.DefaultConfig()
config.Port = 0 // Use random available port
server := websocket.NewServer(config)
// Set up message handler
var receivedMessage websocket.Message
server.OnMessage(func(client *websocket.Client, message websocket.Message) {
receivedMessage = message
})
// Start server
go server.Start()
defer server.Stop()
// Connect test client
url := fmt.Sprintf("ws://localhost:%d%s", server.Port(), config.Path)
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
assert.NoError(t, err)
defer ws.Close()
// Send test message
testMessage := websocket.Message{
Event: "test_event",
Data: map[string]interface{}{
"test": "data",
},
}
err = ws.WriteJSON(testMessage)
assert.NoError(t, err)
// Wait for message to be received
time.Sleep(100 * time.Millisecond)
// Verify message was received
assert.Equal(t, "test_event", receivedMessage.Event)
assert.Equal(t, "data", receivedMessage.Data.(map[string]interface{})["test"])
}Best Practices
- Handle Disconnections: Always handle client disconnections gracefully
- Authenticate Connections: Validate client authentication before processing messages
- Rate Limiting: Implement rate limiting to prevent abuse
- Message Validation: Validate all incoming messages
- Error Handling: Provide clear error messages to clients
- Room Management: Clean up rooms when they become empty
- Monitoring: Monitor connection health and server performance
- Security: Validate origins and implement proper CORS policies
