- 向一个已经关闭的通道发送值和关闭一个已经关闭的通道,
都会引发运行时候的恐慌,缓冲器就是解决这个问题诞生的。
Put 方法先检查缓冲器实例是否关闭,并且保证只有在检查结果是否的时候
进行存入。,在Close 方法中仅在当前缓冲器实例未关闭的情况下
进行关闭操作
package buffer
import (
"sync"
"sync/atomic"
)
import (
"fmt"
"gopcp.v2/chapter6/webcrawler/toolkit/buffer"
"sync"
)
/***
向一个已经关闭的通道发送值和关闭一个已经关闭的通道,
都会引发运行时候的恐慌,缓冲器就是解决这个问题诞生的。
Put 方法先检查缓冲器实例是否关闭,并且保证只有在检查结果是否的时候
进行存入。,在Close 方法中仅在当前缓冲器实例未关闭的情况下
进行关闭操作
*/
// Buffer 代表FIFO的缓冲器的接口类型。
type Buffer interface {
// Cap 用于获取本缓冲器的容量。
Cap() uint32
// Len 用于获取本缓冲器中的数据数量。
Len() uint32
// Put 用于向缓冲器放入数据。
// 注意!本方法应该是非阻塞的。
// 若缓冲器已关闭则会直接返回非nil的错误值。
Put(datum interface{}) (bool, error)
// Get 用于从缓冲器获取器。
// 注意!本方法应该是非阻塞的。
// 若缓冲器已关闭则会直接返回非nil的错误值。
Get() (interface{}, error)
// Close 用于关闭缓冲器。
// 若缓冲器之前已关闭则返回false,否则返回true。
Close() bool
// Closed 用于判断缓冲器是否已关闭。
Closed() bool
}
type myBuffer struct {
//ch 代表采访数据的通道
ch chan interface{}
//closed 代表缓冲器的关闭状态
closed uint32
//closingLock 代表为了消除因关闭缓冲器而产生的竟态条件的读写锁
closingLock sync.RWMutex
}
func NewBuffer(size uint32)(Buffer, error){
if size==0 {
errMsg := fmt.Sprintf(" illegal size for buffer:%d",size)
return nil, error.NewIllegalParameterError(errMsg)
}
return &myBuffer{
ch : make(chan interface{},size),
},nil
}
func (buf *myBuffer) Cap() uint32 {
return uint32(cap(buf.ch))
}
func (buf *myBuffer) Len() uint32 {
return uint32(len(buf.ch))
}
func (buf *myBuffer) Put (datum interface{}) (ok bool,err error){
buf.closingLock.RLock()
defer buf.closingLock.Unlock()
if buf.Closed() {
return false,ErrClosedBuffer
}
select {
case buf.ch <- datum:
ok = true
default:
ok = false
}
return
}
func (buf *myBuffer) Get() (interface{}, error) {
select {
case datum, ok := <-buf.ch:
if !ok {
return nil, ErrClosedBuffer
}
return datum, nil
default:
return nil, nil
}
}
func (buf *myBuffer)Close() bool {
if atomic.CompareAndSwapUint32(&buf.closed,0,1){
buf.closingLock.Lock()
close(buf.ch)
buf.closingLock.Unlock()
return true
}
return false
}
io.Reader 实现类型:bytes.Reader bufio.Reader ,这类读取器只能够读取他们底层的一遍数据,当读完数据后,他们的Read 方法总会把io.EOF 变量的值做为错误值返回。net/http 中的 http.Reaponse 类型的body 就是 io.Readcloser接口类型的,io.Readcloser的类型声明嵌入了io.Reader,前者只比后者多一个Close的方法。
package errors
import (
"bytes"
"fmt"
"strings"
)
type ErrorType string
const (
// ERROR_TYPE_DOWNLOADER 代表下载器错误。
ERROR_TYPE_DOWNLOADER ErrorType = "downloader error"
// ERROR_TYPE_ANALYZER 代表分析器错误。
ERROR_TYPE_ANALYZER ErrorType = "analyzer error"
// ERROR_TYPE_PIPELINE 代表条目处理管道错误。
ERROR_TYPE_PIPELINE ErrorType = "pipeline error"
// ERROR_TYPE_SCHEDULER 代表调度器错误。
ERROR_TYPE_SCHEDULER ErrorType = "scheduler error"
)
type CrawlerError interface {
Type() ErrorType
error() string
}
type myCrawlerError struct{
errType ErrorType
errMsg string
fullErrMsg string
}
// NewCrawlerError 用于创建一个新的爬虫错误值。
func NewCrawlerError(errType ErrorType, errMsg string) CrawlerError {
return &myCrawlerError{
errType: errType,
errMsg: strings.TrimSpace(errMsg),
}
}
// NewCrawlerErrorBy 用于根据给定的错误值创建一个新的爬虫错误值。
func NewCrawlerErrorBy(errType ErrorType, err error) CrawlerError {
return NewCrawlerError(errType, err.Error())
}
func (ce *myCrawlerError) Type() ErrorType {
return ce.errType
}
func (ce *myCrawlerError) Error() string {
if ce.fullErrMsg == "" {
ce.genFullErrMsg()
}
return ce.fullErrMsg
}
func (ce *myCrawlerError) getFullErrMsg() {
var buffer bytes.Buffer
buffer.WriteString("crawler error:")
if ce.errType!="" {
buffer.WriteString(string(ce.errType))
buffer.WriteString(":")
}
buffer.WriteString(ce.errMsg)
ce.fullErrMsg =fmt.Sprintf("%s",buffer.String())
return
}
// IllegalParameterError 代表非法的参数的错误类型。
type IllegalParameterError struct {
msg string
}
// NewIllegalParameterError 会创建一个IllegalParameterError类型的实例。
func NewIllegalParameterError(errMsg string) IllegalParameterError {
return IllegalParameterError{
msg: fmt.Sprintf("illegal parameter: %s",
strings.TrimSpace(errMsg)),
}
}
func (ipe IllegalParameterError) Error() string {
return ipe.msg
}
|
请发表评论