commit
530c24b019
35 changed files with 3601 additions and 0 deletions
pkg
91
pkg/broadcast/broadcast.go
Normal file
91
pkg/broadcast/broadcast.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
Package broadcast provides pubsub of messages over channels.
|
||||
|
||||
A provider has a Broadcaster into which it Submits messages and into
|
||||
which subscribers Register to pick up those messages.
|
||||
*/
|
||||
package broadcast
|
||||
|
||||
type Broadcaster[T any] struct {
|
||||
input chan T
|
||||
reg chan chan<- T
|
||||
unreg chan chan<- T
|
||||
|
||||
outputs map[chan<- T]bool
|
||||
}
|
||||
|
||||
func (b *Broadcaster[T]) broadcast(m T) {
|
||||
for ch := range b.outputs {
|
||||
ch <- m
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broadcaster[T]) run() {
|
||||
for {
|
||||
select {
|
||||
case m := <-b.input:
|
||||
b.broadcast(m)
|
||||
case ch, ok := <-b.reg:
|
||||
if ok {
|
||||
b.outputs[ch] = true
|
||||
} else {
|
||||
return
|
||||
}
|
||||
case ch := <-b.unreg:
|
||||
delete(b.outputs, ch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewBroadcaster creates a new broadcaster with the given input
|
||||
// channel buffer length.
|
||||
func NewBroadcaster[T any](buflen int) *Broadcaster[T] {
|
||||
b := &Broadcaster[T]{
|
||||
input: make(chan T, buflen),
|
||||
reg: make(chan chan<- T),
|
||||
unreg: make(chan chan<- T),
|
||||
outputs: make(map[chan<- T]bool),
|
||||
}
|
||||
|
||||
go b.run()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// Register a new channel to receive broadcasts
|
||||
func (b *Broadcaster[T]) Register(newch chan<- T) {
|
||||
b.reg <- newch
|
||||
}
|
||||
|
||||
// Unregister a channel so that it no longer receives broadcasts.
|
||||
func (b *Broadcaster[T]) Unregister(newch chan<- T) {
|
||||
b.unreg <- newch
|
||||
}
|
||||
|
||||
// Shut this broadcaster down.
|
||||
func (b *Broadcaster[T]) Close() error {
|
||||
close(b.reg)
|
||||
close(b.unreg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Submit an item to be broadcast to all listeners.
|
||||
func (b *Broadcaster[T]) Submit(m T) {
|
||||
if b != nil {
|
||||
b.input <- m
|
||||
}
|
||||
}
|
||||
|
||||
// TrySubmit attempts to submit an item to be broadcast, returning
|
||||
// true iff it the item was broadcast, else false.
|
||||
func (b *Broadcaster[T]) TrySubmit(m T) bool {
|
||||
if b == nil {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case b.input <- m:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
120
pkg/broadcast/broadcast_test.go
Normal file
120
pkg/broadcast/broadcast_test.go
Normal file
|
@ -0,0 +1,120 @@
|
|||
package broadcast
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBroadcast(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
b := NewBroadcaster[int](100)
|
||||
defer b.Close()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
cch := make(chan int)
|
||||
|
||||
b.Register(cch)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer b.Unregister(cch)
|
||||
<-cch
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
b.Submit(1)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestBroadcastTrySubmit(t *testing.T) {
|
||||
b := NewBroadcaster[int](1)
|
||||
defer b.Close()
|
||||
|
||||
if ok := b.TrySubmit(0); !ok {
|
||||
t.Fatalf("1st TrySubmit assert error expect=true actual=%v", ok)
|
||||
}
|
||||
|
||||
if ok := b.TrySubmit(1); ok {
|
||||
t.Fatalf("2nd TrySubmit assert error expect=false actual=%v", ok)
|
||||
}
|
||||
|
||||
cch := make(chan int)
|
||||
b.Register(cch)
|
||||
|
||||
if ok := b.TrySubmit(1); !ok {
|
||||
t.Fatalf("3rd TrySubmit assert error expect=true actual=%v", ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcastCleanup(t *testing.T) {
|
||||
b := NewBroadcaster[int](100)
|
||||
b.Register(make(chan int))
|
||||
b.Close()
|
||||
}
|
||||
|
||||
func echoer(chin, chout chan interface{}) {
|
||||
for m := range chin {
|
||||
chout <- m
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDirectSend(b *testing.B) {
|
||||
chout := make(chan interface{})
|
||||
chin := make(chan interface{})
|
||||
defer close(chin)
|
||||
|
||||
go echoer(chin, chout)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
chin <- nil
|
||||
<-chout
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBrodcast(b *testing.B) {
|
||||
chout := make(chan int)
|
||||
|
||||
bc := NewBroadcaster[int](0)
|
||||
defer bc.Close()
|
||||
bc.Register(chout)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
bc.Submit(1)
|
||||
<-chout
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkParallelDirectSend(b *testing.B) {
|
||||
chout := make(chan interface{})
|
||||
chin := make(chan interface{})
|
||||
defer close(chin)
|
||||
|
||||
go echoer(chin, chout)
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
chin <- nil
|
||||
<-chout
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkParallelBrodcast(b *testing.B) {
|
||||
chout := make(chan int)
|
||||
|
||||
bc := NewBroadcaster[int](0)
|
||||
defer bc.Close()
|
||||
bc.Register(chout)
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
bc.Submit(1)
|
||||
<-chout
|
||||
}
|
||||
})
|
||||
}
|
1
pkg/broadcast/subscription.go
Normal file
1
pkg/broadcast/subscription.go
Normal file
|
@ -0,0 +1 @@
|
|||
package broadcast
|
161
pkg/uuid/uuid.go
Normal file
161
pkg/uuid/uuid.go
Normal file
|
@ -0,0 +1,161 @@
|
|||
package uuid
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
fuuid "github.com/gofrs/uuid/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/bsontype"
|
||||
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
|
||||
)
|
||||
|
||||
var Nil = UUID{}
|
||||
|
||||
type UUIDList = []UUID
|
||||
|
||||
type UUID struct {
|
||||
fuuid.UUID
|
||||
}
|
||||
|
||||
func New() UUID {
|
||||
return UUID{fuuid.Must(fuuid.NewV7())}
|
||||
}
|
||||
|
||||
func NewFromTime(t time.Time) UUID {
|
||||
gen := fuuid.NewGenWithOptions(
|
||||
fuuid.WithEpochFunc(func() time.Time { return t }),
|
||||
)
|
||||
return UUID{fuuid.Must(gen.NewV7())}
|
||||
}
|
||||
|
||||
func NewP() *UUID {
|
||||
return &UUID{fuuid.Must(fuuid.NewV7())}
|
||||
}
|
||||
|
||||
func FromString(text string) (UUID, error) {
|
||||
u, err := fuuid.FromString(text)
|
||||
if err != nil {
|
||||
return Nil, err
|
||||
}
|
||||
|
||||
return UUID{u}, nil
|
||||
}
|
||||
|
||||
func MustFromString(text string) UUID {
|
||||
u, err := fuuid.FromString(text)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return UUID{u}
|
||||
}
|
||||
|
||||
func FromBytes(input []byte) (UUID, error) {
|
||||
u, err := fuuid.FromBytes(input)
|
||||
if err != nil {
|
||||
return Nil, err
|
||||
}
|
||||
|
||||
return UUID{u}, nil
|
||||
}
|
||||
|
||||
func (a *UUID) UnmarshalJSON(b []byte) error {
|
||||
var s string
|
||||
if err := json.Unmarshal(b, &s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s == "" {
|
||||
a.UUID = fuuid.Nil
|
||||
return nil
|
||||
}
|
||||
|
||||
return a.UUID.Parse(s)
|
||||
}
|
||||
|
||||
func (a UUID) MarshalJSON() ([]byte, error) {
|
||||
if a.IsNil() {
|
||||
return json.Marshal("")
|
||||
}
|
||||
|
||||
return json.Marshal(a.UUID)
|
||||
}
|
||||
|
||||
// UnmarshalGQL implements the graphql.Unmarshaler interface
|
||||
func (u *UUID) UnmarshalGQL(v interface{}) error {
|
||||
id, ok := v.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("uuid must be a string")
|
||||
}
|
||||
|
||||
return u.Parse(id)
|
||||
}
|
||||
|
||||
// MarshalGQL implements the graphql.Marshaler interface
|
||||
func (u UUID) MarshalGQL(w io.Writer) {
|
||||
b := []byte(strconv.Quote(u.String()))
|
||||
_, err := w.Write(b)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
const uuidSubtype = 4
|
||||
|
||||
// MarshalBSONValue для официального mongo драйвера (go.mongodb.org/mongo-driver/mongo)
|
||||
func (id UUID) MarshalBSONValue() (bsontype.Type, []byte, error) {
|
||||
if id.IsNil() {
|
||||
return bsontype.Null, nil, nil
|
||||
} else {
|
||||
bin := bsoncore.AppendBinary(nil, uuidSubtype, id.UUID.Bytes())
|
||||
return bson.TypeBinary, bin, nil
|
||||
}
|
||||
}
|
||||
|
||||
// MarshalBSONValue для официального mongo драйвера (go.mongodb.org/mongo-driver/mongo)
|
||||
func (id *UUID) UnmarshalBSONValue(t bsontype.Type, data []byte) error {
|
||||
switch t {
|
||||
case bsontype.Null, bsontype.Undefined:
|
||||
id = &UUID{} // nil uuid value 00000000-0000-0000-0000-000000000000
|
||||
return nil
|
||||
case bsontype.Binary:
|
||||
subtype, bin, _, ok := bsoncore.ReadBinary(data)
|
||||
if !ok {
|
||||
return errors.New("invalid bson binary value")
|
||||
}
|
||||
if subtype != uuidSubtype && subtype != 3 { // 3 is a deprecated uuid subtype, used for compatibility
|
||||
return fmt.Errorf("unsupported binary subtype for uuid: %d", subtype)
|
||||
}
|
||||
return id.UUID.UnmarshalBinary(bin)
|
||||
default:
|
||||
return fmt.Errorf("unsupported value type for uuid: %s", t.String())
|
||||
}
|
||||
}
|
||||
|
||||
var _ pgtype.UUIDValuer = UUID{}
|
||||
|
||||
// UUIDValue implements pgtype.UUIDValuer.
|
||||
func (a UUID) UUIDValue() (pgtype.UUID, error) {
|
||||
return pgtype.UUID{
|
||||
Bytes: a.UUID,
|
||||
Valid: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ pgtype.UUIDScanner = (*UUID)(nil)
|
||||
|
||||
// ScanUUID implements pgtype.UUIDScanner.
|
||||
func (a *UUID) ScanUUID(v pgtype.UUID) error {
|
||||
if !v.Valid {
|
||||
return fmt.Errorf("cannot scan NULL into *uuid.UUID")
|
||||
}
|
||||
|
||||
a.UUID = v.Bytes
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue