发布/订阅模式

发布订阅模式比较常用,近期使用grpc也涉及到连接管理简单记录一下发布订阅模式的实现,主要使用chan来传输数据

也可以考虑使用回调函数来处理publish的内容

定义Publisher

首先定义一个Publish的struct,主要包含Subscribers属性来管理订阅者,订阅者必须使用指针来保证数据传递

// Publisher 发布者
// 管理订阅者
// 发布内容
type Publisher struct {
	sync.RWMutex
	Subscribers map[string]*Subscriber
	waitGroup sync.WaitGroup
}

func NewPublisher() *Publisher {
	return &Publisher{
		Subscribers: make(map[string]*Subscriber),
	}
}

接下来实现Publisher的基础功能


// 发布者的功能
// 添加订阅者
func (p *Publisher) AddSubscriber(s *Subscriber) {
	p.Lock()
	p.Subscribers[s.Name] = s
	p.Unlock()
	fmt.Printf("添加订阅者%s成功\n", s.Name)
}

// 删除订阅者
func (p *Publisher) RemoveSubscriber(name string) {
	p.Lock()
	if _, ok := p.Subscribers[name]; ok {
		delete(p.Subscribers, name)
	}
	p.Unlock()
	fmt.Printf("移除订阅者%s成功\n", name)
}

// 发布内容
func (p *Publisher) Publish(msg string) {
	p.RLock()
	defer p.RUnlock()
	p.waitGroup.Add(len(p.Subscribers))
	for _, s := range p.Subscribers {
		d_ := msg
		s_ := s
		go func() {
			s_.Run(d_)
			p.waitGroup.Done()
		}()
	}
	p.waitGroup.Wait()
}

定义Subscriber

订阅者主要是接受发布者推送的数据,当然也应该包含一个唯一的标识符,可采用uuid,本文简单采用一个name(string)

type Subscriber struct {
	sync.RWMutex
	Name string
	Data chan string
}

func NewSubscriber(name string) *Subscriber {
	return &Subscriber{Name: name, Data: make(chan string)}
}

订阅者应当实现的功能

// 订阅者功能
// 订阅内容
func (s *Subscriber) Subscribe(p *Publisher) {
	p.AddSubscriber(s)
}

// 取消订阅
func (s *Subscriber) UnSubscribe(p *Publisher) {
	p.RemoveSubscriber(s.Name)
}

// 处理发布的内容
func (s *Subscriber) Run(msg string) {
	s.Lock()
	go func() {
		s.Data <- msg
		s.Unlock()
	}()
}

完整代码

// demo/subscribe/publisher.go
package subscribe

import (
	"fmt"
	"sync"
)

// Publisher 发布者
// 管理订阅者
// 发布内容
type Publisher struct {
	sync.RWMutex
	Subscribers map[string]*Subscriber
	waitGroup sync.WaitGroup
}

func NewPublisher() *Publisher {
	return &Publisher{
		Subscribers: make(map[string]*Subscriber),
	}
}

// 发布者的功能
// 添加订阅者
func (p *Publisher) AddSubscriber(s *Subscriber) {
	p.Lock()
	p.Subscribers[s.Name] = s
	p.Unlock()
	fmt.Printf("添加订阅者%s成功\n", s.Name)
}

// 删除订阅者
func (p *Publisher) RemoveSubscriber(name string) {
	p.Lock()
	if _, ok := p.Subscribers[name]; ok {
		delete(p.Subscribers, name)
	}
	p.Unlock()
	fmt.Printf("移除订阅者%s成功\n", name)
}

// 发布者事件处理 - 产生数据
func (p *Publisher) Update(){

}

// 发布内容
func (p *Publisher) Publish(msg string) {
	p.RLock()
	defer p.RUnlock()
	p.waitGroup.Add(len(p.Subscribers))
	for _, s := range p.Subscribers {
		d_ := msg
		s_ := s
		go func() {
			s_.Run(d_)
			p.waitGroup.Done()
		}()
	}
	p.waitGroup.Wait()
}

// demo/subscribe/subscriber.go
package subscribe

import (
	"sync"
)

// Subscriber 订阅者
// 订阅内容,等待发布者发布内容
type Subscriber struct {
	sync.RWMutex
	Name string
	Data chan string
}

func NewSubscriber(name string) *Subscriber {
	return &Subscriber{Name: name, Data: make(chan string)}
}


// 订阅者功能
// 订阅内容
func (s *Subscriber) Subscribe(p *Publisher) {
	p.AddSubscriber(s)
}

// 取消订阅
func (s *Subscriber) UnSubscribe(p *Publisher) {
	p.RemoveSubscriber(s.Name)
}

// 处理发布的内容
func (s *Subscriber) Run(msg string) {
	s.Lock()
	go func() {
		s.Data <- string
		s.Unlock()
	}()
}
// demo/main.go
package main

import (
  "fmt"
  "time"
  "demo/subscribe"
  "strconv"
)

func main(){
  var (
    pub *subscribe.Publiser
    sub1 *subscribe.Subscriber
    sub2 *subscribe.Subscriber
  )

  pub = subscibe.NewPublisher()

  sub1 = subscribe.NewSubscriber("pub1")
  sub2 = subscribe.NewSubscriber("pub2")
  sub1.Subscribe(pub)
  sub2.Subscribe(pub)

  go func(){
    i := 0
    for {
      if i < 100 {
        break
      }
      pub.Publish(strconv.Itoa(i))
      i ++
    }
  }()

  go func(){
    for{
      d1, ok := <- sub1.Data
      if !ok {
        continue
      }
      fmt.Print(d1)
    }
  }()

    go func(){
    for{
      d2, ok := <- sub2.Data
      if !ok {
        continue
      }
      fmt.Print(d2)
    }
  }()

  time.Sleep(10*time.Second)
}