Events
Build event-driven applications with Velocity's observer pattern for decoupled, extensible architecture.
Velocity provides a powerful event system that allows you to decouple various parts of your application using the observer pattern. Events enable clean, maintainable code by separating concerns and making your application more extensible.
Quick Start
import "github.com/velocitykode/velocity/pkg/events"
// Define an event
type UserRegistered struct {
UserID int
Email string
}
func (e UserRegistered) Name() string {
return "user.registered"
}
// Dispatch the event
func registerUser(email string, password string) {
// Create user...
user := createUser(email, password)
// Dispatch event
events.Dispatch(UserRegistered{
UserID: user.ID,
Email: user.Email,
})
}// Define a listener
type SendWelcomeEmail struct{}
func (l *SendWelcomeEmail) Handle(event interface{}) error {
e := event.(UserRegistered)
// Send welcome email
return sendEmail(e.Email, "Welcome to our platform!")
}
func (l *SendWelcomeEmail) ShouldQueue() bool {
return true // Process asynchronously
}
// Register the listener
func init() {
events.Listen("user.registered", &SendWelcomeEmail{})
}// Create a logger that handles all user events
type UserActivityLogger struct{}
func (l *UserActivityLogger) Handle(event interface{}) error {
log.Info("User event occurred", "event", event)
return nil
}
func (l *UserActivityLogger) ShouldQueue() bool {
return false
}
// Listen to all user events
func init() {
events.Listen("user.*", &UserActivityLogger{})
// Listen to all "created" events
events.Listen("*.created", &AuditLogger{})
// Listen to all events
events.Listen("*", &GlobalLogger{})
}// Dispatch events asynchronously
func processOrder(order *Order) error {
// Save order synchronously
if err := db.Create(order); err != nil {
return err
}
// Dispatch events asynchronously
events.DispatchAsync(OrderPlaced{
OrderID: order.ID,
Total: order.Total,
})
// Or dispatch after a delay
events.DispatchAfter(
OrderFollowUp{OrderID: order.ID},
24 * time.Hour,
)
return nil
}Event Definition
Using Structs
Define events as structs that implement the Event interface:
type UserRegistered struct {
UserID int
Email string
Timestamp time.Time
}
func (e UserRegistered) Name() string {
return "user.registered"
}
type OrderPlaced struct {
OrderID string
UserID int
Total float64
Items []OrderItem
}
func (e OrderPlaced) Name() string {
return "order.placed"
}Using Strings
For simple events, you can use strings directly:
// Dispatch string events
events.Dispatch("cache.cleared")
events.Dispatch("maintenance.started")
// Listen to string events
events.Listen("cache.cleared", &CacheListener{})Auto-Generated Names
If you don’t implement the Name() method, Velocity will generate a name from the struct type:
type UserRegistered struct {
UserID int
Email string
}
// Automatically generates name: "user.registered"
// (CamelCase converted to dot.notation)Listeners
Basic Listener
type MyListener struct{}
func (l *MyListener) Handle(event interface{}) error {
// Type assert to get event data
e, ok := event.(UserRegistered)
if !ok {
return fmt.Errorf("unexpected event type")
}
// Process the event
log.Info("Processing event", "user_id", e.UserID)
return nil
}
func (l *MyListener) ShouldQueue() bool {
return false // Synchronous processing
}Queued Listener
For long-running tasks, use queued listeners:
type SendWelcomeEmail struct {
events.QueuedBaseListener
}
func (l *SendWelcomeEmail) Handle(event interface{}) error {
e := event.(UserRegistered)
// Send email (time-consuming operation)
return emailService.Send(e.Email, "Welcome!")
}
func (l *SendWelcomeEmail) ShouldQueue() bool {
return true
}
func (l *SendWelcomeEmail) OnQueue() string {
return "emails" // Queue name
}
func (l *SendWelcomeEmail) Tries() int {
return 3 // Retry attempts
}
func (l *SendWelcomeEmail) WithDelay() time.Duration {
return 5 * time.Second // Processing delay
}Conditional Listener
Execute listener logic only when conditions are met:
type NotifyPremiumUsers struct{}
func (l *NotifyPremiumUsers) Handle(event interface{}) error {
e := event.(FeatureReleased)
// Notify premium users
return notificationService.NotifyPremium(e.FeatureName)
}
func (l *NotifyPremiumUsers) ShouldQueue() bool {
return true
}
func (l *NotifyPremiumUsers) ShouldHandle(event interface{}) bool {
e, ok := event.(FeatureReleased)
if !ok {
return false
}
// Only handle premium features
return e.IsPremium
}Event Subscribers
Subscribers let you group multiple event listeners in a single class:
type UserEventSubscriber struct{}
func (s *UserEventSubscriber) Subscribe(dispatcher events.Dispatcher) {
dispatcher.Listen("user.registered", &SendWelcomeEmail{})
dispatcher.Listen("user.registered", &UpdateStatistics{})
dispatcher.Listen("user.updated", &SyncUserData{})
dispatcher.Listen("user.deleted", &CleanupUserData{})
}
// Register the subscriber
func init() {
events.Subscribe(&UserEventSubscriber{})
}Auto Subscriber
Automatically register methods as listeners based on naming convention:
type UserSubscriber struct{}
// Method names starting with "Handle" are auto-registered
// HandleUserRegistered -> listens to "user.registered"
func (s *UserSubscriber) HandleUserRegistered(event interface{}) error {
e := event.(UserRegistered)
log.Info("User registered", "user_id", e.UserID)
return nil
}
// HandleUserUpdated -> listens to "user.updated"
func (s *UserSubscriber) HandleUserUpdated(event interface{}) error {
e := event.(UserUpdated)
log.Info("User updated", "user_id", e.UserID)
return nil
}
// Register auto subscriber
func init() {
subscriber := events.NewAutoSubscriber(&UserSubscriber{}, "Handle")
events.Subscribe(subscriber)
}Mapped Subscriber
Explicitly map methods to events:
type OrderSubscriber struct{}
func (s *OrderSubscriber) ProcessOrder(event interface{}) error {
// Handle order.placed event
return nil
}
func (s *OrderSubscriber) CancelOrder(event interface{}) error {
// Handle order.cancelled event
return nil
}
// Register with explicit mapping
func init() {
subscriber := events.NewMappedSubscriber(&OrderSubscriber{}, events.EventMap{
"ProcessOrder": "order.placed",
"CancelOrder": "order.cancelled",
})
events.Subscribe(subscriber)
}Dispatching Events
Synchronous Dispatch
// Dispatch and wait for all listeners to complete
err := events.Dispatch(UserRegistered{
UserID: 123,
Email: "user@example.com",
})
if err != nil {
log.Error("Event dispatch failed", "error", err)
}Force Synchronous
// Always dispatch synchronously, even for queued listeners
err := events.DispatchNow(OrderPlaced{
OrderID: "ORD-123",
})Asynchronous Dispatch
// Dispatch without waiting (uses goroutines or queue)
err := events.DispatchAsync(EmailSent{
To: "user@example.com",
Subject: "Welcome",
})Delayed Dispatch
// Dispatch after a delay
err := events.DispatchAfter(
OrderFollowUp{OrderID: "ORD-123"},
24 * time.Hour,
)Dispatch Until Result
// Dispatch until first non-nil result
result, err := events.Until(ValidatePayment{
Amount: 99.99,
Method: "credit_card",
})
if result != nil {
paymentResult := result.(*PaymentResult)
// Use result
}Wildcard Patterns
Velocity supports flexible wildcard patterns for event matching:
Prefix Matching
// Listen to all user events
events.Listen("user.*", &UserActivityLogger{})
// Matches:
// - user.registered
// - user.updated
// - user.deleted
// - user.anythingSuffix Matching
// Listen to all "created" events
events.Listen("*.created", &CreatedLogger{})
// Matches:
// - user.created
// - order.created
// - product.createdMatch Everything
// Listen to all events
events.Listen("*", &GlobalLogger{})Multiple Patterns
// Listen to multiple event patterns
events.Listen([]string{
"user.registered",
"user.updated",
"order.placed",
}, &MultiEventListener{})Global Dispatcher Functions
// Register a listener
events.Listen("user.registered", &MyListener{})
// Register a subscriber
events.Subscribe(&MySubscriber{})
// Dispatch an event
events.Dispatch(UserRegistered{UserID: 1})
// Dispatch synchronously
events.DispatchNow(OrderPlaced{OrderID: "123"})
// Dispatch asynchronously
events.DispatchAsync(EmailSent{})
// Dispatch with delay
events.DispatchAfter(Reminder{}, 1*time.Hour)
// Dispatch until result
result, _ := events.Until(ValidateData{})
// Check if event has listeners
if events.HasListeners("user.registered") {
// Event has listeners
}
// Get all listeners for an event
listeners := events.GetListeners("user.registered")
// Remove all listeners for an event
events.Flush("user.registered")
// Remove specific listeners
events.Forget("user.registered")Common Use Cases
User Registration Flow
// Event
type UserRegistered struct {
UserID int
Email string
Name string
IPAddress string
}
func (e UserRegistered) Name() string {
return "user.registered"
}
// Listeners
type SendWelcomeEmail struct{}
func (l *SendWelcomeEmail) Handle(event interface{}) error {
e := event.(UserRegistered)
return emailService.SendWelcome(e.Email, e.Name)
}
func (l *SendWelcomeEmail) ShouldQueue() bool { return true }
type CreateUserProfile struct{}
func (l *CreateUserProfile) Handle(event interface{}) error {
e := event.(UserRegistered)
return profileService.Create(e.UserID)
}
func (l *CreateUserProfile) ShouldQueue() bool { return false }
type TrackRegistration struct{}
func (l *TrackRegistration) Handle(event interface{}) error {
e := event.(UserRegistered)
return analytics.Track("user_registered", map[string]interface{}{
"user_id": e.UserID,
"ip": e.IPAddress,
})
}
func (l *TrackRegistration) ShouldQueue() bool { return true }
// Setup
func init() {
events.Listen("user.registered", &SendWelcomeEmail{})
events.Listen("user.registered", &CreateUserProfile{})
events.Listen("user.registered", &TrackRegistration{})
}
// Usage in handler
func (c *AuthHandler) Register(ctx *router.Context) error {
user := createUser(email, password)
events.Dispatch(UserRegistered{
UserID: user.ID,
Email: user.Email,
Name: user.Name,
IPAddress: ctx.Request.RemoteAddr,
})
return ctx.JSON(user)
}Order Processing Pipeline
type OrderPlaced struct {
OrderID string
UserID int
Total float64
Items []OrderItem
CreatedAt time.Time
}
func (e OrderPlaced) Name() string {
return "order.placed"
}
// Listeners for different responsibilities
func init() {
events.Listen("order.placed", &ProcessPayment{})
events.Listen("order.placed", &SendOrderConfirmation{})
events.Listen("order.placed", &UpdateInventory{})
events.Listen("order.placed", &NotifyWarehouse{})
events.Listen("order.placed", &UpdateAnalytics{})
}Audit Logging
type AuditLogger struct{}
func (l *AuditLogger) Handle(event interface{}) error {
// Log all model changes
return auditLog.Record(event)
}
func (l *AuditLogger) ShouldQueue() bool {
return true
}
// Register for all creation events
func init() {
events.Listen("*.created", &AuditLogger{})
events.Listen("*.updated", &AuditLogger{})
events.Listen("*.deleted", &AuditLogger{})
}Cache Invalidation
type CacheInvalidator struct{}
func (l *CacheInvalidator) Handle(event interface{}) error {
switch e := event.(type) {
case UserUpdated:
cache.Forget("user:" + strconv.Itoa(e.UserID))
case ProductUpdated:
cache.Forget("product:" + e.ProductID)
}
return nil
}
func (l *CacheInvalidator) ShouldQueue() bool {
return false // Invalidate immediately
}
func init() {
events.Listen("*.updated", &CacheInvalidator{})
}Testing
Using Fake Dispatcher
import "github.com/velocitykode/velocity/pkg/events"
func TestUserRegistration(t *testing.T) {
// Create fake dispatcher
fake := events.NewFake()
// Swap global dispatcher
events.Initialize(fake)
defer events.Reset()
// Perform action that dispatches events
registerUser("test@example.com", "password")
// Assert event was dispatched
fake.AssertDispatched(UserRegistered{}, func(e interface{}) bool {
event := e.(UserRegistered)
return event.Email == "test@example.com"
})
// Assert specific number of times
fake.AssertDispatchedTimes(UserRegistered{}, 1)
// Assert not dispatched
fake.AssertNotDispatched(UserDeleted{})
}Testing Listeners
func TestSendWelcomeEmail(t *testing.T) {
listener := &SendWelcomeEmail{}
event := UserRegistered{
UserID: 1,
Email: "test@example.com",
Name: "Test User",
}
err := listener.Handle(event)
assert.NoError(t, err)
// Verify email was sent
assert.True(t, emailService.WasSent("test@example.com"))
}Integration Tests
func TestEventFlow(t *testing.T) {
// Setup real dispatcher with test listeners
dispatcher := events.NewDispatcher()
events.Initialize(dispatcher)
// Track which listeners were called
var called []string
mu := sync.Mutex{}
trackingListener := func(name string) events.Listener {
return &testListener{
handle: func(e interface{}) error {
mu.Lock()
called = append(called, name)
mu.Unlock()
return nil
},
}
}
events.Listen("user.registered", trackingListener("email"))
events.Listen("user.registered", trackingListener("profile"))
events.Listen("user.registered", trackingListener("analytics"))
// Dispatch event
events.Dispatch(UserRegistered{UserID: 1})
// Verify all listeners were called
assert.Equal(t, 3, len(called))
assert.Contains(t, called, "email")
assert.Contains(t, called, "profile")
assert.Contains(t, called, "analytics")
}Best Practices
- Use Meaningful Event Names: Follow dot notation (e.g.,
user.registered,order.placed) - Include All Necessary Data: Events should contain all data listeners need
- Keep Listeners Focused: Each listener should have a single responsibility
- Use Queues for Heavy Tasks: Queue emails, notifications, and API calls
- Make Listeners Idempotent: Listeners should handle duplicate events gracefully
- Handle Errors Gracefully: Don’t let one listener failure affect others
- Document Your Events: Clearly document what events exist and when they’re fired
- Test Event Flows: Use the fake dispatcher to test event dispatching
Performance Considerations
Async vs Sync
// Synchronous: Blocks until all listeners complete
events.DispatchNow(event)
// Asynchronous: Returns immediately
events.DispatchAsync(event)Use synchronous when:
- Event handling must complete before continuing
- Order of execution matters
- Error handling is critical
Use asynchronous when:
- Event handling can happen in background
- Performance is critical
- Failures can be retried
Queue Integration
For high-volume applications, integrate with the queue system:
import "github.com/velocitykode/velocity/pkg/queue"
// Setup queue dispatcher
queueDispatcher := &QueueEventDispatcher{
queue: queue.Connection("redis"),
}
dispatcher := events.NewDispatcher()
dispatcher.SetQueueDispatcher(queueDispatcher)
events.Initialize(dispatcher)Batching Events
// Collect events and dispatch in batches
type EventBatcher struct {
events []interface{}
mu sync.Mutex
}
func (b *EventBatcher) Add(event interface{}) {
b.mu.Lock()
b.events = append(b.events, event)
b.mu.Unlock()
}
func (b *EventBatcher) Flush() error {
b.mu.Lock()
defer b.mu.Unlock()
for _, event := range b.events {
if err := events.Dispatch(event); err != nil {
return err
}
}
b.events = nil
return nil
}Troubleshooting
Events Not Firing
Check:
- Listeners are registered before events are dispatched
- Event names match exactly (case-sensitive)
- Global dispatcher is initialized
- No errors in listener registration
Listeners Not Called
Check:
- Wildcard patterns are correct
ShouldHandle()method isn’t preventing execution- Event is being dispatched on the correct dispatcher instance
- Listener implements the
Listenerinterface correctly
Performance Issues
Solutions:
- Use
DispatchAsync()for non-critical events - Implement queue-based event handling
- Batch events when possible
- Profile listener execution times
- Remove unnecessary listeners
