konfa-server/pkg/broadcast/broadcast.go
royalcat 530c24b019
All checks were successful
docker / build-docker (push) Successful in 2m20s
wip
2024-12-23 15:54:51 +03:00

91 lines
1.7 KiB
Go

/*
Package broadcast provides pubsub of messages over channels.
A provider has a Broadcaster into which it Submits messages and into
which subscribers Register to pick up those messages.
*/
package broadcast
type Broadcaster[T any] struct {
input chan T
reg chan chan<- T
unreg chan chan<- T
outputs map[chan<- T]bool
}
func (b *Broadcaster[T]) broadcast(m T) {
for ch := range b.outputs {
ch <- m
}
}
func (b *Broadcaster[T]) run() {
for {
select {
case m := <-b.input:
b.broadcast(m)
case ch, ok := <-b.reg:
if ok {
b.outputs[ch] = true
} else {
return
}
case ch := <-b.unreg:
delete(b.outputs, ch)
}
}
}
// NewBroadcaster creates a new broadcaster with the given input
// channel buffer length.
func NewBroadcaster[T any](buflen int) *Broadcaster[T] {
b := &Broadcaster[T]{
input: make(chan T, buflen),
reg: make(chan chan<- T),
unreg: make(chan chan<- T),
outputs: make(map[chan<- T]bool),
}
go b.run()
return b
}
// Register a new channel to receive broadcasts
func (b *Broadcaster[T]) Register(newch chan<- T) {
b.reg <- newch
}
// Unregister a channel so that it no longer receives broadcasts.
func (b *Broadcaster[T]) Unregister(newch chan<- T) {
b.unreg <- newch
}
// Shut this broadcaster down.
func (b *Broadcaster[T]) Close() error {
close(b.reg)
close(b.unreg)
return nil
}
// Submit an item to be broadcast to all listeners.
func (b *Broadcaster[T]) Submit(m T) {
if b != nil {
b.input <- m
}
}
// TrySubmit attempts to submit an item to be broadcast, returning
// true iff it the item was broadcast, else false.
func (b *Broadcaster[T]) TrySubmit(m T) bool {
if b == nil {
return false
}
select {
case b.input <- m:
return true
default:
return false
}
}