69 lines
950 B
Go
69 lines
950 B
Go
package eventsbus
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Event struct {
|
|
ID int64
|
|
Kind string
|
|
Ts string
|
|
Data any
|
|
}
|
|
|
|
type Bus struct {
|
|
mu sync.Mutex
|
|
cond *sync.Cond
|
|
buf []Event
|
|
cap int
|
|
next int64
|
|
}
|
|
|
|
func New(capacity int) *Bus {
|
|
if capacity < 16 {
|
|
capacity = 16
|
|
}
|
|
b := &Bus{
|
|
cap: capacity,
|
|
buf: make([]Event, 0, capacity),
|
|
}
|
|
b.cond = sync.NewCond(&b.mu)
|
|
return b
|
|
}
|
|
|
|
func (b *Bus) Push(kind string, data any) Event {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
b.next++
|
|
evt := Event{
|
|
ID: b.next,
|
|
Kind: kind,
|
|
Ts: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Data: data,
|
|
}
|
|
|
|
if len(b.buf) >= b.cap {
|
|
b.buf = b.buf[1:]
|
|
}
|
|
b.buf = append(b.buf, evt)
|
|
b.cond.Broadcast()
|
|
return evt
|
|
}
|
|
|
|
func (b *Bus) Since(id int64) []Event {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if len(b.buf) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]Event, 0, len(b.buf))
|
|
for _, ev := range b.buf {
|
|
if ev.ID > id {
|
|
out = append(out, ev)
|
|
}
|
|
}
|
|
return out
|
|
}
|