本文整理汇总了Golang中github.com/hashicorp/nomad/client/allocdir.AllocDirFS类的典型用法代码示例。如果您正苦于以下问题:Golang AllocDirFS类的具体用法?Golang AllocDirFS怎么用?Golang AllocDirFS使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了AllocDirFS类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: blockUntilNextLog
// blockUntilNextLog returns a channel that will have data sent when the next
// log index or anything greater is created.
func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logType string, nextIndex int64) chan error {
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex))
next := make(chan error, 1)
go func() {
eofCancelCh, err := fs.BlockUntilExists(nextPath, t)
if err != nil {
next <- err
close(next)
return
}
ticker := time.NewTicker(nextLogCheckRate)
defer ticker.Stop()
scanCh := ticker.C
for {
select {
case <-t.Dead():
next <- fmt.Errorf("shutdown triggered")
close(next)
return
case err := <-eofCancelCh:
next <- err
close(next)
return
case <-scanCh:
entries, err := fs.List(logPath)
if err != nil {
next <- fmt.Errorf("failed to list entries: %v", err)
close(next)
return
}
indexes, err := logIndexes(entries, task, logType)
if err != nil {
next <- err
close(next)
return
}
// Scan and see if there are any entries larger than what we are
// waiting for.
for _, entry := range indexes {
if entry.idx >= nextIndex {
next <- nil
close(next)
return
}
}
}
}
}()
return next
}
开发者ID:zanella,项目名称:nomad,代码行数:57,代码来源:fs_endpoint.go
示例2: logs
func (s *HTTPServer) logs(follow bool, offset int64,
origin, task, logType string,
fs allocdir.AllocDirFS, output io.WriteCloser) error {
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Path to the logs
logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)
// nextIdx is the next index to read logs from
var nextIdx int64
switch origin {
case "start":
nextIdx = 0
case "end":
nextIdx = math.MaxInt64
offset *= -1
default:
return invalidOrigin
}
// Create a tomb to cancel watch events
t := tomb.Tomb{}
defer func() {
t.Kill(nil)
t.Done()
}()
for {
// Logic for picking next file is:
// 1) List log files
// 2) Pick log file closest to desired index
// 3) Open log file at correct offset
// 3a) No error, read contents
// 3b) If file doesn't exist, goto 1 as it may have been rotated out
entries, err := fs.List(logPath)
if err != nil {
return fmt.Errorf("failed to list entries: %v", err)
}
// If we are not following logs, determine the max index for the logs we are
// interested in so we can stop there.
maxIndex := int64(math.MaxInt64)
if !follow {
_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
if err != nil {
return err
}
maxIndex = idx
}
logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
if err != nil {
return err
}
var eofCancelCh chan error
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
exitAfter = true
} else {
eofCancelCh = blockUntilNextLog(fs, &t, logPath, task, logType, idx+1)
}
p := filepath.Join(logPath, logEntry.Name)
err = s.stream(openOffset, p, fs, framer, eofCancelCh)
if err != nil {
// Check if there was an error where the file does not exist. That means
// it got rotated out from under us.
if os.IsNotExist(err) {
continue
}
// Check if the connection was closed
if err == syscall.EPIPE {
return nil
}
return fmt.Errorf("failed to stream %q: %v", p, err)
}
if exitAfter {
return nil
}
//Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
nextIdx = idx + 1
}
//.........这里部分代码省略.........
开发者ID:nak3,项目名称:nomad,代码行数:101,代码来源:fs_endpoint.go
示例3: stream
// stream is the internal method to stream the content of a file. eofCancelCh is
// used to cancel the stream if triggered while at EOF. If the connection is
// broken an EPIPE error is returned
func (s *HTTPServer) stream(offset int64, path string,
fs allocdir.AllocDirFS, framer *StreamFramer,
eofCancelCh chan error) error {
// Get the reader
f, err := fs.ReadAt(path, offset)
if err != nil {
return err
}
defer f.Close()
// Create a tomb to cancel watch events
t := tomb.Tomb{}
defer func() {
t.Kill(nil)
t.Done()
}()
// Create a variable to allow setting the last event
var lastEvent string
// Only create the file change watcher once. But we need to do it after we
// read and reach EOF.
var changes *watch.FileChanges
// Start streaming the data
data := make([]byte, streamFrameSize)
OUTER:
for {
// Read up to the max frame size
n, readErr := f.Read(data)
// Update the offset
offset += int64(n)
// Return non-EOF errors
if readErr != nil && readErr != io.EOF {
return readErr
}
// Send the frame
if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
// Check if the connection has been closed
if err == io.ErrClosedPipe {
// The pipe check is for tests
return syscall.EPIPE
}
operr, ok := err.(*net.OpError)
if ok {
// The connection was closed by our peer
e := operr.Err.Error()
if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
}
return err
}
}
// Clear the last event
if lastEvent != "" {
lastEvent = ""
}
// Just keep reading
if readErr == nil {
continue
}
// If EOF is hit, wait for a change to the file
if changes == nil {
changes, err = fs.ChangeEvents(path, offset, &t)
if err != nil {
return err
}
}
for {
select {
case <-changes.Modified:
continue OUTER
case <-changes.Deleted:
return framer.Send(path, deleteEvent, nil, offset)
case <-changes.Truncated:
// Close the current reader
if err := f.Close(); err != nil {
return err
}
// Get a new reader at offset zero
offset = 0
var err error
f, err = fs.ReadAt(path, offset)
//.........这里部分代码省略.........
开发者ID:nak3,项目名称:nomad,代码行数:101,代码来源:fs_endpoint.go
注:本文中的github.com/hashicorp/nomad/client/allocdir.AllocDirFS类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论