面UCloud的时候问到了这题,下来看了一下是基于观察者模式实现的,仅作记录
/**
* @Author: lzw5399
* @Date: 2021/5/20 20:38
* @Desc: 基于观察者模式实现的订阅发布
*/
package main
import (
"errors"
"fmt"
"sync"
)
func main() {
server := NewServer()
server.Subscribe("CCTV1", NewBasicClient("张三"))
server.Subscribe("CCTV2", NewBasicClient("张三"))
server.Subscribe("CCTV1", NewBasicClient("李四"))
server.Publish("CCTV1", "开播辣!")
}
// ---server---
type Server struct {
Topics map[string]*Topic // k=topicName v=topic
sync.RWMutex
}
func NewServer() *Server {
return &Server{
Topics: make(map[string]*Topic),
RWMutex: sync.RWMutex{},
}
}
func (s *Server) Subscribe(topicName string, client Client) {
s.RLock()
topic, exist := s.Topics[topicName]
s.RUnlock()
// 存在topic直接添加client
if exist {
topic.AddClient(client)
return
}
// 不存在topic,新建topic,在添加client
topic = NewTopic(topicName)
topic.AddClient(client)
s.Lock()
s.Topics[topicName] = topic
s.Unlock()
}
func (s *Server) Publish(topicName string, message string) error {
s.RLock()
topic, exist := s.Topics[topicName]
s.RUnlock()
if exist {
topic.Notify(message)
return nil
}
return errors.New("当前topic不存在")
}
// ---end server---
// ---topic---
type Topic struct {
Name string
clients map[string]Client // k=clientName v=client
sync.RWMutex
}
func NewTopic(name string) *Topic {
return &Topic{
Name: name,
clients: make(map[string]Client),
RWMutex: sync.RWMutex{},
}
}
func (t *Topic) AddClient(client Client) {
// 只添加存在的
if _, exist := t.clients[client.Name()]; !exist {
t.Lock()
t.clients[client.Name()] = client
t.Unlock()
}
}
func (t *Topic) Notify(message string) {
for _, v := range t.clients {
v.ConsumeCallback(message)
}
}
// ---end topic---
// ---client---
type Client interface {
Name() string
ConsumeCallback(message string)
}
type BasicClient struct {
name string
}
func NewBasicClient(name string) *BasicClient {
return &BasicClient{
name: name,
}
}
func (b *BasicClient) Name() string {
return b.name
}
func (b *BasicClient) ConsumeCallback(message string) {
fmt.Printf("当前Client: %s, 接收到的消息为: %s\n", b.Name(), message)
}
// ---end client---
|
请发表评论