本文整理汇总了Golang中github.com/janelia-flyem/dvid/datastore.NotifySubscribers函数的典型用法代码示例。如果您正苦于以下问题:Golang NotifySubscribers函数的具体用法?Golang NotifySubscribers怎么用?Golang NotifySubscribers使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了NotifySubscribers函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: publishDownresCommit
func (d *Data) publishDownresCommit(v dvid.VersionID, mutID uint64) {
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: DownsizeCommitEvent}
msg := datastore.SyncMessage{Event: DownsizeCommitEvent, Version: v, Delta: mutID}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Criticalf("unable to notify subscribers of event %s: %v\n", evt, err)
}
}
开发者ID:janelia-flyem,项目名称:dvid,代码行数:7,代码来源:sync.go
示例2: MergeLabels
// MergeLabels handles merging of any number of labels throughout the various label data
// structures. It assumes that the merges aren't cascading, e.g., there is no attempt
// to merge label 3 into 4 and also 4 into 5. The caller should have flattened the merges.
// TODO: Provide some indication that subset of labels are under evolution, returning
// an "unavailable" status or 203 for non-authoritative response. This might not be
// feasible for clustered DVID front-ends due to coordination issues.
//
// EVENTS
//
// labels.MergeStartEvent occurs at very start of merge and transmits labels.DeltaMergeStart struct.
//
// labels.MergeBlockEvent occurs for every block of a merged label and transmits labels.DeltaMerge struct.
//
// labels.MergeEndEvent occurs at end of merge and transmits labels.DeltaMergeEnd struct.
//
func (d *Data) MergeLabels(v dvid.VersionID, m labels.MergeOp) error {
dvid.Infof("Merging data %q (labels %s) into label %d ...\n", d.DataName(), m.Merged, m.Target)
// Mark these labels as dirty until done, and make sure we can actually initiate the merge.
if err := labels.MergeStart(d.getMergeIV(v), m); err != nil {
return err
}
d.StartUpdate()
// Signal that we are starting a merge.
evt := datastore.SyncEvent{d.DataUUID(), labels.MergeStartEvent}
msg := datastore.SyncMessage{labels.MergeStartEvent, v, labels.DeltaMergeStart{m}}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
d.StopUpdate()
return err
}
// Asynchronously perform merge and handle any concurrent requests using the cache map until
// labelvol and labelblk are updated and consistent.
go func() {
d.asyncMergeLabels(v, m)
// Remove dirty labels and updating flag when done.
labels.MergeStop(d.getMergeIV(v), m)
d.StopUpdate()
dvid.Infof("Finished with merge of labels %s.\n", m)
}()
return nil
}
开发者ID:janelia-flyem,项目名称:dvid,代码行数:45,代码来源:merge_split.go
示例3: writeBlocks
// TODO -- Clean up all the writing and simplify now that we have block-aligned writes.
// writeBlocks ingests blocks of voxel data asynchronously using batch writes.
func (d *Data) writeBlocks(v dvid.VersionID, b storage.TKeyValues, wg1, wg2 *sync.WaitGroup) error {
batcher, err := d.GetKeyValueBatcher()
if err != nil {
return err
}
preCompress, postCompress := 0, 0
ctx := datastore.NewVersionedCtx(d, v)
evt := datastore.SyncEvent{d.DataUUID(), IngestBlockEvent}
<-server.HandlerToken
go func() {
defer func() {
wg1.Done()
wg2.Done()
dvid.Debugf("Wrote voxel blocks. Before %s: %d bytes. After: %d bytes\n", d.Compression(), preCompress, postCompress)
server.HandlerToken <- 1
}()
mutID := d.NewMutationID()
batch := batcher.NewBatch(ctx)
for i, block := range b {
serialization, err := dvid.SerializeData(block.V, d.Compression(), d.Checksum())
preCompress += len(block.V)
postCompress += len(serialization)
if err != nil {
dvid.Errorf("Unable to serialize block: %v\n", err)
return
}
batch.Put(block.K, serialization)
indexZYX, err := DecodeTKey(block.K)
if err != nil {
dvid.Errorf("Unable to recover index from block key: %v\n", block.K)
return
}
msg := datastore.SyncMessage{IngestBlockEvent, v, Block{indexZYX, block.V, mutID}}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Errorf("Unable to notify subscribers of ChangeBlockEvent in %s\n", d.DataName())
return
}
// Check if we should commit
if i%KVWriteSize == KVWriteSize-1 {
if err := batch.Commit(); err != nil {
dvid.Errorf("Error on trying to write batch: %v\n", err)
return
}
batch = batcher.NewBatch(ctx)
}
}
if err := batch.Commit(); err != nil {
dvid.Errorf("Error on trying to write batch: %v\n", err)
return
}
}()
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:61,代码来源:write.go
示例4: mergeLabels
func (d *Data) mergeLabels(batcher storage.KeyValueBatcher, v dvid.VersionID, op labels.MergeOp) error {
d.Lock()
defer d.Unlock()
d.StartUpdate()
ctx := datastore.NewVersionedCtx(d, v)
batch := batcher.NewBatch(ctx)
// Get the target label
targetTk := NewLabelTKey(op.Target)
targetElems, err := getElements(ctx, targetTk)
if err != nil {
return fmt.Errorf("get annotations for instance %q, target %d, in syncMerge: %v\n", d.DataName(), op.Target, err)
}
// Iterate through each merged label, read old elements, delete that k/v, then add it to the current target elements.
var delta DeltaModifyElements
elemsAdded := 0
for label := range op.Merged {
tk := NewLabelTKey(label)
elems, err := getElements(ctx, tk)
if err != nil {
return fmt.Errorf("unable to get annotation elements for instance %q, label %d in syncMerge: %v\n", d.DataName(), label, err)
}
if elems == nil || len(elems) == 0 {
continue
}
batch.Delete(tk)
elemsAdded += len(elems)
targetElems = append(targetElems, elems...)
// for labelsz. TODO, only do this computation if really subscribed.
for _, elem := range elems {
delta.Add = append(delta.Add, ElementPos{Label: op.Target, Kind: elem.Kind, Pos: elem.Pos})
delta.Del = append(delta.Del, ElementPos{Label: label, Kind: elem.Kind, Pos: elem.Pos})
}
}
if elemsAdded > 0 {
val, err := json.Marshal(targetElems)
if err != nil {
return fmt.Errorf("couldn't serialize annotation elements in instance %q: %v\n", d.DataName(), err)
}
batch.Put(targetTk, val)
if err := batch.Commit(); err != nil {
return fmt.Errorf("unable to commit merge for instance %q: %v\n", d.DataName(), err)
}
}
d.StopUpdate()
// Notify any subscribers of label annotation changes.
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: ModifyElementsEvent}
msg := datastore.SyncMessage{Event: ModifyElementsEvent, Version: ctx.VersionID(), Delta: delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Criticalf("unable to notify subscribers of event %s: %v\n", evt, err)
}
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:57,代码来源:sync.go
示例5: moveElementInLabels
func (d *Data) moveElementInLabels(ctx *datastore.VersionedCtx, batch storage.Batch, from, to dvid.Point3d, moved ElementNR) error {
labelData := d.GetSyncedLabelblk()
if labelData == nil {
return nil // no label denormalization possible
}
oldLabel, err := labelData.GetLabelAtPoint(ctx.VersionID(), from)
if err != nil {
return err
}
newLabel, err := labelData.GetLabelAtPoint(ctx.VersionID(), to)
if err != nil {
return err
}
if oldLabel == newLabel {
return nil
}
var delta DeltaModifyElements
if oldLabel != 0 {
tk := NewLabelTKey(oldLabel)
elems, err := getElementsNR(ctx, tk)
if err != nil {
return fmt.Errorf("err getting elements for label %d: %v", oldLabel, err)
}
if _, changed := elems.delete(from); changed {
if err := putBatchElements(batch, tk, elems); err != nil {
return fmt.Errorf("err putting deleted label %d element: %v", oldLabel, err)
}
delta.Del = append(delta.Del, ElementPos{Label: oldLabel, Kind: moved.Kind, Pos: from})
}
}
if newLabel != 0 {
tk := NewLabelTKey(newLabel)
elems, err := getElementsNR(ctx, tk)
if err != nil {
return fmt.Errorf("err getting elements for label %d: %v", newLabel, err)
}
elems.add(ElementsNR{moved})
if err := putBatchElements(batch, tk, elems); err != nil {
return err
}
delta.Add = append(delta.Add, ElementPos{Label: newLabel, Kind: moved.Kind, Pos: to})
}
// Notify any subscribers of label annotation changes.
if len(delta.Del) != 0 || len(delta.Add) != 0 {
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: ModifyElementsEvent}
msg := datastore.SyncMessage{Event: ModifyElementsEvent, Version: ctx.VersionID(), Delta: delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
return err
}
}
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:55,代码来源:annotation.go
示例6: publishBlockChange
// Notify any downstream downres instance of block change.
func (d *Data) publishBlockChange(v dvid.VersionID, mutID uint64, block dvid.IZYXString, blockData []byte) {
evt := datastore.SyncEvent{d.DataUUID(), DownsizeBlockEvent}
delta := deltaBlock{
mutID: mutID,
block: block,
data: blockData,
}
msg := datastore.SyncMessage{DownsizeBlockEvent, v, delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Criticalf("unable to notify subscribers of event %s: %v\n", evt, err)
}
}
开发者ID:janelia-flyem,项目名称:dvid,代码行数:13,代码来源:sync.go
示例7: deleteElementInLabel
func (d *Data) deleteElementInLabel(ctx *datastore.VersionedCtx, batch storage.Batch, pt dvid.Point3d) error {
labelData := d.GetSyncedLabelblk()
if labelData == nil {
return nil // no synced labels
}
label, err := labelData.GetLabelAtPoint(ctx.VersionID(), pt)
if err != nil {
return err
}
tk := NewLabelTKey(label)
elems, err := getElementsNR(ctx, tk)
if err != nil {
return fmt.Errorf("err getting elements for label %d: %v\n", label, err)
}
// Note all elements to be deleted.
var delta DeltaModifyElements
var toDel []int
for i, elem := range elems {
if pt.Equals(elem.Pos) {
delta.Del = append(delta.Del, ElementPos{Label: label, Kind: elem.Kind, Pos: elem.Pos})
toDel = append(toDel, i)
}
}
if len(toDel) == 0 {
return nil
}
// Delete them from high index to low index due while reusing slice.
for i := len(toDel) - 1; i >= 0; i-- {
d := toDel[i]
elems[d] = elems[len(elems)-1]
elems[len(elems)-1] = ElementNR{}
elems = elems[:len(elems)-1]
}
// Put the modified list of elements
if err := putBatchElements(batch, tk, elems); err != nil {
return err
}
// Notify any subscribers of label annotation changes.
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: ModifyElementsEvent}
msg := datastore.SyncMessage{Event: ModifyElementsEvent, Version: ctx.VersionID(), Delta: delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
return err
}
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:49,代码来源:annotation.go
示例8: splitLabelsCoarse
func (d *Data) splitLabelsCoarse(batcher storage.KeyValueBatcher, v dvid.VersionID, op labels.DeltaSplit) error {
d.Lock()
defer d.Unlock()
d.StartUpdate()
defer d.StopUpdate()
ctx := datastore.NewVersionedCtx(d, v)
batch := batcher.NewBatch(ctx)
// Get the elements for the old label.
oldTk := NewLabelTKey(op.OldLabel)
oldElems, err := getElements(ctx, oldTk)
if err != nil {
return fmt.Errorf("unable to get annotations for instance %q, label %d in syncSplit: %v\n", d.DataName(), op.OldLabel, err)
}
// Create a map to test each point.
splitBlocks := make(map[dvid.IZYXString]struct{})
for _, zyxStr := range op.SortedBlocks {
splitBlocks[zyxStr] = struct{}{}
}
// Move any elements that are within the split blocks.
var delta DeltaModifyElements
toDel := make(map[int]struct{})
toAdd := Elements{}
blockSize := d.blockSize()
for i, elem := range oldElems {
zyxStr := elem.Pos.ToBlockIZYXString(blockSize)
if _, found := splitBlocks[zyxStr]; found {
toDel[i] = struct{}{}
toAdd = append(toAdd, elem)
// for downstream annotation syncs like labelsz. TODO: only perform if subscribed. Better: do ROI filtering here.
delta.Del = append(delta.Del, ElementPos{Label: op.OldLabel, Kind: elem.Kind, Pos: elem.Pos})
delta.Add = append(delta.Add, ElementPos{Label: op.NewLabel, Kind: elem.Kind, Pos: elem.Pos})
}
}
if len(toDel) == 0 {
return nil
}
// Store split elements into new label elements.
newTk := NewLabelTKey(op.NewLabel)
newElems, err := getElements(ctx, newTk)
if err != nil {
return fmt.Errorf("unable to get annotations for instance %q, label %d in syncSplit: %v\n", d.DataName(), op.NewLabel, err)
}
newElems.add(toAdd)
val, err := json.Marshal(newElems)
if err != nil {
return fmt.Errorf("couldn't serialize annotation elements in instance %q: %v\n", d.DataName(), err)
}
batch.Put(newTk, val)
// Delete any split from old label elements without removing the relationships.
// This filters without allocating, using fact that a slice shares the same backing array and
// capacity as the original, so storage is reused.
filtered := oldElems[:0]
for i, elem := range oldElems {
if _, found := toDel[i]; !found {
filtered = append(filtered, elem)
}
}
// Delete or store k/v depending on what remains.
if len(filtered) == 0 {
batch.Delete(oldTk)
} else {
val, err := json.Marshal(filtered)
if err != nil {
return fmt.Errorf("couldn't serialize annotation elements in instance %q: %v\n", d.DataName(), err)
}
batch.Put(oldTk, val)
}
if err := batch.Commit(); err != nil {
return fmt.Errorf("bad commit in annotations %q after split: %v\n", d.DataName(), err)
}
// Notify any subscribers of label annotation changes.
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: ModifyElementsEvent}
msg := datastore.SyncMessage{Event: ModifyElementsEvent, Version: ctx.VersionID(), Delta: delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Criticalf("unable to notify subscribers of event %s: %v\n", evt, err)
}
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:89,代码来源:sync.go
示例9: deleteBlock
// If a block of labels is deleted, the associated synapse elements should be changed to zero label elements.
func (d *Data) deleteBlock(ctx *datastore.VersionedCtx, block labels.DeleteBlock, batcher storage.KeyValueBatcher) {
// Get the synaptic elements for this block
chunkPt := dvid.ChunkPoint3d(*block.Index)
tk := NewBlockTKey(chunkPt)
elems, err := getElements(ctx, tk)
if err != nil {
dvid.Errorf("err getting elements for block %s: %v\n", chunkPt, err)
return
}
if len(elems) == 0 {
return
}
blockSize := d.blockSize()
batch := batcher.NewBatch(ctx)
// Compute the strides (in bytes)
bX := blockSize[0] * 8
bY := blockSize[1] * bX
// Iterate through all element positions, finding corresponding label and storing elements.
toDel := LabelPoints{}
for _, elem := range elems {
pt := elem.Pos.Point3dInChunk(blockSize)
i := pt[2]*bY + pt[1]*bX + pt[0]*8
label := binary.LittleEndian.Uint64(block.Data[i : i+8])
toDel.add(label, elem.Pos)
}
// Delete any non-zero label elements from their respective label k/v.
var delta DeltaModifyElements
for label, pts := range toDel {
tk := NewLabelTKey(label)
elems, err := getElements(ctx, tk)
if err != nil {
dvid.Errorf("err getting elements for label %d: %v\n", label, err)
return
}
save := false
for _, pt := range pts {
deleted, changed := elems.delete(pt)
if changed {
save = true
delta.Del = append(delta.Del, ElementPos{Label: label, Kind: deleted.Kind, Pos: pt})
}
}
if save {
if len(elems) == 0 {
batch.Delete(tk)
} else {
val, err := json.Marshal(elems)
if err != nil {
dvid.Errorf("couldn't serialize annotation elements in instance %q: %v\n", d.DataName(), err)
return
}
batch.Put(tk, val)
}
}
}
if err := batch.Commit(); err != nil {
dvid.Criticalf("bad commit in annotations %q after delete block: %v\n", d.DataName(), err)
return
}
// Notify any subscribers of label annotation changes.
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: ModifyElementsEvent}
msg := datastore.SyncMessage{Event: ModifyElementsEvent, Version: ctx.VersionID(), Delta: delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Criticalf("unable to notify subscribers of event %s: %v\n", evt, err)
}
}
开发者ID:tartavull,项目名称:dvid,代码行数:72,代码来源:sync.go
示例10: PutBlocks
// PutBlocks stores blocks of data in a span along X
func (d *Data) PutBlocks(v dvid.VersionID, mutID uint64, start dvid.ChunkPoint3d, span int, data io.ReadCloser, mutate bool) error {
batcher, err := d.GetKeyValueBatcher()
if err != nil {
return err
}
ctx := datastore.NewVersionedCtx(d, v)
batch := batcher.NewBatch(ctx)
// Read blocks from the stream until we can output a batch put.
const BatchSize = 1000
var readBlocks int
numBlockBytes := d.BlockSize().Prod()
chunkPt := start
buf := make([]byte, numBlockBytes)
for {
// Read a block's worth of data
readBytes := int64(0)
for {
n, err := data.Read(buf[readBytes:])
readBytes += int64(n)
if readBytes == numBlockBytes {
break
}
if err == io.EOF {
return fmt.Errorf("Block data ceased before all block data read")
}
if err != nil {
return fmt.Errorf("Error reading blocks: %v\n", err)
}
}
if readBytes != numBlockBytes {
return fmt.Errorf("Expected %d bytes in block read, got %d instead! Aborting.", numBlockBytes, readBytes)
}
serialization, err := dvid.SerializeData(buf, d.Compression(), d.Checksum())
if err != nil {
return err
}
zyx := dvid.IndexZYX(chunkPt)
tk := NewTKey(&zyx)
// If we are mutating, get the previous block of data.
var oldBlock []byte
if mutate {
oldBlock, err = d.loadOldBlock(v, tk)
if err != nil {
return fmt.Errorf("Unable to load previous block in %q, key %v: %v\n", d.DataName(), tk, err)
}
}
// Write the new block
batch.Put(tk, serialization)
// Notify any subscribers that you've changed block.
var event string
var delta interface{}
if mutate {
event = MutateBlockEvent
delta = MutatedBlock{&zyx, oldBlock, buf, mutID}
} else {
event = IngestBlockEvent
delta = Block{&zyx, buf, mutID}
}
evt := datastore.SyncEvent{d.DataUUID(), event}
msg := datastore.SyncMessage{event, v, delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
return err
}
// Advance to next block
chunkPt[0]++
readBlocks++
finish := (readBlocks == span)
if finish || readBlocks%BatchSize == 0 {
if err := batch.Commit(); err != nil {
return fmt.Errorf("Error on batch commit, block %d: %v\n", readBlocks, err)
}
batch = batcher.NewBatch(ctx)
}
if finish {
break
}
}
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:88,代码来源:write.go
示例11: SplitLabels
// SplitLabels splits a portion of a label's voxels into a given split label or, if the given split
// label is 0, a new label, which is returned. The input is a binary sparse volume and should
// preferably be the smaller portion of a labeled region. In other words, the caller should chose
// to submit for relabeling the smaller portion of any split. It is assumed that the given split
// voxels are within the fromLabel set of voxels and will generate unspecified behavior if this is
// not the case.
//
// EVENTS
//
// labels.SplitStartEvent occurs at very start of split and transmits labels.DeltaSplitStart struct.
//
// labels.SplitBlockEvent occurs for every block of a split label and transmits labels.DeltaSplit struct.
//
// labels.SplitEndEvent occurs at end of split and transmits labels.DeltaSplitEnd struct.
//
func (d *Data) SplitLabels(v dvid.VersionID, fromLabel, splitLabel uint64, r io.ReadCloser) (toLabel uint64, err error) {
store, err := d.GetOrderedKeyValueDB()
if err != nil {
err = fmt.Errorf("Data type labelvol had error initializing store: %v\n", err)
return
}
batcher, ok := store.(storage.KeyValueBatcher)
if !ok {
err = fmt.Errorf("Data type labelvol requires batch-enabled store, which %q is not\n", store)
return
}
// Create a new label id for this version that will persist to store
if splitLabel != 0 {
toLabel = splitLabel
dvid.Debugf("Splitting subset of label %d into given label %d ...\n", fromLabel, splitLabel)
} else {
toLabel, err = d.NewLabel(v)
if err != nil {
return
}
dvid.Debugf("Splitting subset of label %d into new label %d ...\n", fromLabel, toLabel)
}
evt := datastore.SyncEvent{d.DataUUID(), labels.SplitStartEvent}
splitOpStart := labels.DeltaSplitStart{fromLabel, toLabel}
splitOpEnd := labels.DeltaSplitEnd{fromLabel, toLabel}
// Make sure we can split given current merges in progress
if err := labels.SplitStart(d.getMergeIV(v), splitOpStart); err != nil {
return toLabel, err
}
defer labels.SplitStop(d.getMergeIV(v), splitOpEnd)
// Signal that we are starting a split.
msg := datastore.SyncMessage{labels.SplitStartEvent, v, splitOpStart}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
return 0, err
}
// Read the sparse volume from reader.
var split dvid.RLEs
split, err = dvid.ReadRLEs(r)
if err != nil {
return
}
toLabelSize, _ := split.Stats()
// Partition the split spans into blocks.
var splitmap dvid.BlockRLEs
splitmap, err = split.Partition(d.BlockSize)
if err != nil {
return
}
// Get a sorted list of blocks that cover split.
splitblks := splitmap.SortedKeys()
// Iterate through the split blocks, read the original block. If the RLEs
// are identical, just delete the original. If not, modify the original.
// TODO: Modifications should be transactional since it's GET-PUT, therefore use
// hash on block coord to direct it to blockLabel, splitLabel-specific goroutine; we serialize
// requests to handle concurrency.
ctx := datastore.NewVersionedCtx(d, v)
batch := batcher.NewBatch(ctx)
for _, splitblk := range splitblks {
// Get original block
tk := NewTKey(fromLabel, splitblk)
val, err := store.Get(ctx, tk)
if err != nil {
return toLabel, err
}
if val == nil {
return toLabel, fmt.Errorf("Split RLEs at block %s are not part of original label %d", splitblk, fromLabel)
}
var rles dvid.RLEs
if err := rles.UnmarshalBinary(val); err != nil {
return toLabel, fmt.Errorf("Unable to unmarshal RLE for original labels in block %s", splitblk)
}
// Compare and process based on modifications required.
remain, err := rles.Split(splitmap[splitblk])
//.........这里部分代码省略.........
开发者ID:janelia-flyem,项目名称:dvid,代码行数:101,代码来源:merge_split.go
示例12: putChunk
func (d *Data) putChunk(chunk *storage.Chunk) {
defer func() {
// After processing a chunk, return the token.
server.HandlerToken <- 1
// Notify the requestor that this chunk is done.
if chunk.Wg != nil {
chunk.Wg.Done()
}
}()
op, ok := chunk.Op.(*putOperation)
if !ok {
log.Fatalf("Illegal operation passed to ProcessChunk() for data %s\n", d.DataName())
}
// Make sure our received chunk is valid.
if chunk == nil {
dvid.Errorf("Received nil chunk in ProcessChunk. Ignoring chunk.\n")
return
}
if chunk.K == nil {
dvid.Errorf("Received nil chunk key in ProcessChunk. Ignoring chunk.\n")
return
}
// Initialize the block buffer using the chunk of data. For voxels, this chunk of
// data needs to be uncompressed and deserialized.
var blockData []byte
var err error
if chunk.V == nil {
blockData = d.BackgroundBlock()
} else {
blockData, _, err = dvid.DeserializeData(chunk.V, true)
if err != nil {
dvid.Errorf("Unable to deserialize block in '%s': %v\n", d.DataName(), err)
return
}
}
// Perform the operation.
block := &storage.TKeyValue{K: chunk.K, V: blockData}
if err = op.voxels.WriteBlock(block, d.BlockSize()); err != nil {
dvid.Errorf("Unable to WriteBlock() in %q: %v\n", d.DataName(), err)
return
}
serialization, err := dvid.SerializeData(blockData, d.Compression(), d.Checksum())
if err != nil {
dvid.Errorf("Unable to serialize block in %q: %v\n", d.DataName(), err)
return
}
store, err := storage.BigDataStore()
if err != nil {
dvid.Errorf("Data type imageblk had error initializing store: %v\n", err)
return
}
ctx := datastore.NewVersionedCtx(d, op.version)
if err := store.Put(ctx, chunk.K, serialization); err != nil {
dvid.Errorf("Unable to PUT voxel data for key %v: %v\n", chunk.K, err)
return
}
// Notify any subscribers that you've changed block.
evt := datastore.SyncEvent{d.DataName(), ChangeBlockEvent}
msg := datastore.SyncMessage{op.version, Block{&op.indexZYX, block.V}}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Errorf("Unable to notify subscribers of ChangeBlockEvent in %s\n", d.DataName())
}
}
开发者ID:jwohlwend,项目名称:dvid,代码行数:70,代码来源:write.go
示例13: PutBlocks
// PutBlocks stores blocks of data in a span along X
func (d *Data) PutBlocks(v dvid.VersionID, start dvid.ChunkPoint3d, span int, data io.ReadCloser) error {
store, err := storage.BigDataStore()
if err != nil {
return fmt.Errorf("Data type imageblk had error initializing store: %v\n", err)
}
batcher, ok := store.(storage.KeyValueBatcher)
if !ok {
return fmt.Errorf("Data type imageblk requires batch-enabled store, which %q is not\n", store)
}
ctx := datastore.NewVersionedCtx(d, v)
batch := batcher.NewBatch(ctx)
// Read blocks from the stream until we can output a batch put.
const BatchSize = 1000
var readBlocks int
numBlockBytes := d.BlockSize().Prod()
chunkPt := start
buf := make([]byte, numBlockBytes)
for {
// Read a block's worth of data
readBytes := int64(0)
for {
n, err := data.Read(buf[readBytes:])
readBytes += int64(n)
if readBytes == numBlockBytes {
break
}
if err == io.EOF {
return fmt.Errorf("Block data ceased before all block data read")
}
if err != nil {
return fmt.Errorf("Error reading blocks: %v\n", err)
}
}
if readBytes != numBlockBytes {
return fmt.Errorf("Expected %d bytes in block read, got %d instead! Aborting.", numBlockBytes, readBytes)
}
serialization, err := dvid.SerializeData(buf, d.Compression(), d.Checksum())
if err != nil {
return err
}
zyx := dvid.IndexZYX(chunkPt)
batch.Put(NewTKey(&zyx), serialization)
// Notify any subscribers that you've changed block.
evt := datastore.SyncEvent{d.DataName(), ChangeBlockEvent}
msg := datastore.SyncMessage{v, Block{&zyx, buf}}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
return err
}
// Advance to next block
chunkPt[0]++
readBlocks++
finish := (readBlocks == span)
if finish || readBlocks%BatchSize == 0 {
if err := batch.Commit(); err != nil {
return fmt.Errorf("Error on batch commit, block %d: %v\n", readBlocks, err)
}
batch = batcher.NewBatch(ctx)
}
if finish {
break
}
}
return nil
}
开发者ID:jwohlwend,项目名称:dvid,代码行数:71,代码来源:write.go
示例14: putChunk
func (d *Data) putChunk(chunk *storage.Chunk, putbuffer storage.RequestBuffer) {
defer func() {
// After processing a chunk, return the token.
server.HandlerToken <- 1
// Notify the requestor that this chunk is done.
if chunk.Wg != nil {
chunk.Wg.Done()
}
}()
op, ok := chunk.Op.(*putOperation)
if !ok {
log.Fatalf("Illegal operation passed to ProcessChunk() for data %s\n", d.DataName())
}
// Make sure our received chunk is valid.
if chunk == nil {
dvid.Errorf("Received nil chunk in ProcessChunk. Ignoring chunk.\n")
return
}
if chunk.K == nil {
dvid.Errorf("Received nil chunk key in ProcessChunk. Ignoring chunk.\n")
return
}
// Initialize the block buffer using the chunk of data. For voxels, this chunk of
// data needs to be uncompressed and deserialized.
var blockData []byte
var err error
if chunk.V == nil {
blockData = d.BackgroundBlock()
} else {
blockData, _, err = dvid.DeserializeData(chunk.V, true)
if err != nil {
dvid.Errorf("Unable to deserialize block in %q: %v\n", d.DataName(), err)
return
}
}
// If we are mutating, get the previous block of data.
var oldBlock []byte
if op.mutate {
oldBlock, err = d.loadOldBlock(op.version, chunk.K)
if err != nil {
dvid.Errorf("Unable to load previous block in %q, key %v: %v\n", d.DataName(), chunk.K, err)
return
}
}
// Perform the operation.
block := &storage.TKeyValue{K: chunk.K, V: blockData}
if err = op.voxels.WriteBlock(block, d.BlockSize()); err != nil {
dvid.Errorf("Unable to WriteBlock() in %q: %v\n", d.DataName(), err)
return
}
serialization, err := dvid.SerializeData(blockData, d.Compression(), d.Checksum())
if err != nil {
dvid.Errorf("Unable to serialize block in %q: %v\n", d.DataName(), err)
return
}
store, err := d.GetOrderedKeyValueDB()
if err != nil {
dvid.Errorf("Data type imageblk had error initializing store: %v\n", err)
return
}
ready := make(chan error, 1)
callback := func() {
// Notify any subscribers that you've changed block.
resperr := <-ready
if resperr != nil {
dvid.Errorf("Unable to PUT voxel data for key %v: %v\n", chunk.K, resperr)
return
}
var event string
var delta interface{}
if op.mutate {
event = MutateBlockEvent
delta = MutatedBlock{&op.indexZYX, oldBlock, block.V, op.mutID}
} else {
event = IngestBlockEvent
delta = Block{&op.indexZYX, block.V, op.mutID}
}
evt := datastore.SyncEvent{d.DataUUID(), event}
msg := datastore.SyncMessage{event, op.version, delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Errorf("Unable to notify subscribers of event %s in %s\n", event, d.DataName())
}
}
// put data -- use buffer if available
ctx := datastore.NewVersionedCtx(d, op.version)
if putbuffer != nil {
go callback()
putbuffer.PutCallback(ctx, chunk.K, serialization, ready)
} else {
if err := store.Put(ctx, chunk.K, serialization); err != nil {
dvid.Errorf("Unable to PUT voxel data for key %v: %v\n", chunk.K, err)
//.........这里部分代码省略.........
开发者ID:tartavull,项目名称:dvid,代码行数:101,代码来源:write.go
示例15: splitLabelsFine
func (d *Data) splitLabelsFine(batcher storage.KeyValueBatcher, v dvid.VersionID, op labels.DeltaSplit) error {
d.Lock()
defer d.Unlock()
d.StartUpdate()
defer d.StopUpdate()
ctx := datastore.NewVersionedCtx(d, v)
batch := batcher.NewBatch(ctx)
var delta DeltaModifyElements
toAdd := Elements{}
toDel := make(map[string]struct{})
// Iterate through each split block, get the elements, and then modify the previous and new label k/v.
for izyx, rles := range op.Split {
// Get the elements for this block.
blockPt, err := izyx.ToChunkPoint3d()
if err != nil {
return err
}
tk := NewBlockTKey(blockPt)
elems, err := getElements(ctx, tk)
if err != nil {
dvid.Errorf("getting annotations for block %s on split of %d from %d: %v\n", blockPt, op.NewLabel, op.OldLabel, err)
continue
}
// For any element within the split RLEs, add to the delete and addition lists.
for _, elem := range elems {
for _, rle := range rles {
if rle.Within(elem.Pos) {
toAdd = append(toAdd, elem)
toDel[elem.Pos.String()] = struct{}{}
// for downstream annotation syncs like labelsz. TODO: only perform if subscribed. Better: do ROI filtering here.
delta.Del = append(delta.Del, ElementPos{Label: op.OldLabel, Kind: elem.Kind, Pos: elem.Pos})
delta.Add = append(delta.Add, ElementPos{Label: op.NewLabel, Kind: elem.Kind, Pos: elem.Pos})
break
}
}
}
}
// Modify the old label k/v
if len(toDel) != 0 {
tk := NewLabelTKey(op.OldLabel)
elems, err := getElements(ctx, tk)
if err != nil {
dvid.Errorf("unable to get annotations for instance %q, old label %d in syncSplit: %v\n", d.DataName(), op.OldLabel, err)
} else {
filtered := elems[:0]
for _, elem := range elems {
if _, found := toDel[elem.Pos.String()]; !found {
filtered = append(filtered, elem)
}
}
if len(filtered) == 0 {
batch.Delete(tk)
} else {
val, err := json.Marshal(filtered)
if err != nil {
dvid.Errorf("couldn't serialize annotation elements in instance %q: %v\n", d.DataName(), err)
} else {
batch.Put(tk, val)
}
}
}
}
// Modify the new label k/v
if len(toAdd) != 0 {
tk := NewLabelTKey(op.NewLabel)
elems, err := getElements(ctx, tk)
if err != nil {
dvid.Errorf("unable to get annotations for instance %q, label %d in syncSplit: %v\n", d.DataName(), op.NewLabel, err)
} else {
elems.add(toAdd)
val, err := json.Marshal(elems)
if err != nil {
dvid.Errorf("couldn't serialize annotation elements in instance %q: %v\n", d.DataName(), err)
} else {
batch.Put(tk, val)
}
}
}
if err := batch.Commit(); err != nil {
return fmt.Errorf("bad commit in annotations %q after split: %v\n", d.DataName(), err)
}
// Notify any subscribers of label annotation changes.
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: ModifyElementsEvent}
msg := datastore.SyncMessage{Event: ModifyElementsEvent, Version: ctx.VersionID(), Delta: delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Criticalf("unable to notify subscribers of event %s: %v\n", evt, err)
}
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:99,代码来源:sync.go
示例16: writeBlocks
// TODO -- Clean up all the writing and simplify now that we have block-aligned writes.
// writeBlocks persists blocks of voxel data asynchronously using batch writes.
func (d *Data) writeBlocks(v dvid.VersionID, b storage.TKeyValues, wg1, wg2 *sync.WaitGroup) error {
store, err := storage.BigDataStore()
if err != nil {
return fmt.Errorf("Data type imageblk had error initializing store: %v\n", err)
}
batcher, ok := store.(storage.KeyValueBatcher)
if !ok {
return fmt.Errorf("Data type imageblk requires batch-enabled store, which %q is not\n", store)
}
preCompress, postCompress := 0, 0
ctx := datastore.NewVersionedCtx(d, v)
evt := datastore.SyncEvent{d.DataName(), ChangeBlockEvent}
<-server.HandlerToken
go func() {
defer func() {
wg1.Done()
wg2.Done()
dvid.Debugf("Wrote voxel blocks. Before %s: %d bytes. After: %d bytes\n", d.Compression(), preCompress, postCompress)
server.HandlerToken <- 1
}()
batch := batcher.NewBatch(ctx)
for i, block := range b {
serialization, err := dvid.SerializeData(block.V, d.Compression(), d.Checksum())
preCompress += len(block.V)
postCompress += len(serialization)
if err != nil {
dvid.Errorf("Unable to serialize block: %v\n", err)
return
}
batch.Put(block.K, serialization)
indexZYX, err := DecodeTKey(block.K)
if err != nil {
dvid.Errorf("Unable to recover index from block key: %v\n", block.K)
return
}
msg := datastore.SyncMessage{v, Block{indexZYX, block.V}}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
dvid.Errorf("Unable to notify subscribers of ChangeBlockEvent in %s\n", d.DataName())
return
}
// Check if we should commit
if i%KVWriteSize == KVWriteSize-1 {
if err := batch.Commit(); err != nil {
dvid.Errorf("Error on trying to write batch: %v\n", err)
return
}
batch = batcher.NewBatch(ctx)
}
}
if err := batch.Commit(); err != nil {
dvid.Errorf("Error on trying to write batch: %v\n", err)
return
}
}()
return nil
}
开发者ID:jwohlwend,项目名称:dvid,代码行数:64,代码来源:write.go
示例17: DeleteBlocks
func (d *Data) DeleteBlocks(ctx *datastore.VersionedCtx, start dvid.ChunkPoint3d, span int) error {
store, err := storage.MutableStore()
if err != nil {
return fmt.Errorf("Data type labelblk had error initializing store: %v\n", err)
}
batcher, ok := store.(storage.KeyValueBatcher)
if !ok {
return fmt.Errorf("Data type labelblk requires batch-enabled store, which %q is not\n", store)
}
indexBeg := dvid.IndexZYX(start)
end := start
end[0] += int32(span - 1)
indexEnd := dvid.IndexZYX(end)
begTKey := NewTKey(&indexBeg)
endTKey := NewTKey(&indexEnd)
iv := dvid.InstanceVersion{d.DataName(), ctx.VersionID()}
mapping := labels.MergeCache.LabelMap(iv)
kvs, err := store.GetRange(ctx, begTKey, endTKey)
if err != nil {
return err
}
batch := batcher.NewBatch(ctx)
uncompress := true
for _, kv := range kvs {
izyx, err := DecodeTKey(kv.K)
if err != nil {
return err
}
// Delete the labelblk (really tombstones it)
batch.Delete(kv.K)
// Send data to delete associated labelvol for labels in this block
block, _, err := dvid.DeserializeData(kv.V, uncompress)
if err != nil {
return fmt.Errorf("Unable to deserialize block, %s (%v): %v", ctx, kv.K, err)
}
if mapping != nil {
n := len(block) / 8
for i := 0; i < n; i++ {
orig := binary.LittleEndian.Uint64(block[i*8 : i*8+8])
mapped, found := mapping.FinalLabel(orig)
if !found {
mapped = orig
}
binary.LittleEndian.PutUint64(block[i*8:i*8+8], mapped)
}
}
// Notify any subscribers that we've deleted this block.
evt := datastore.SyncEvent{d.DataName(), labels.DeleteBlockEvent}
msg := datastore.SyncMessage{ctx.VersionID(), labels.DeleteBlock{izyx, block}}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
return err
}
}
return batch.Commit()
}
开发者ID:jmptrader,项目名称:dvid,代码行数:63,代码来源:labelblk.go
示例18: storeLabelElements
// stores synaptic elements arranged by label, replacing any
// elements at same position.
func (d *Data) storeLabelElements(ctx *datastore.VersionedCtx, batch storage.Batch, be blockElements) error {
labelData := d.GetSyncedLabelblk()
if labelData == nil {
dvid.Infof("No synced labels for annotation %q, skipping label-aware denormalization.\n", d.DataName())
return nil // no synced labels
}
// Compute the strides (in bytes)
blockSize := d.blockSize()
bX := blockSize[0] * 8
bY := blockSize[1] * bX
blockBytes := int(blockSize[0] * blockSize[1] * blockSize[2] * 8)
toAdd := LabelElements{}
for izyxStr, elems := range be {
blockCoord, err := izyxStr.ToChunkPoint3d()
if err != nil {
return err
}
// Get the labels for this block
labels, err := labelData.GetLabelBlock(ctx.VersionID(), blockCoord)
if err != nil {
return err
}
if len(labels) == 0 {
continue
}
if len(labels) != blockBytes {
return fmt.Errorf("Expected %d bytes in %q label block, got %d instead. Aborting.", blockBytes, d.DataName(), len(labels))
}
// Group annotations by label
for _, elem := range elems {
pt := elem.Pos.Point3dInChunk(blockSize)
i := pt[2]*bY + pt[1]*bX + pt[0]*8
label := binary.LittleEndian.Uint64(labels[i : i+8])
if label != 0 {
toAdd.add(label, elem.ElementNR)
}
}
}
// Store all the added annotations to the appropriate labels.
var delta DeltaModifyElements
for label, additions := range toAdd {
tk := NewLabelTKey(label)
elems, err := getElementsNR(ctx, tk)
if err != nil {
return fmt.Errorf("err getting elements for label %d: %v\n", label, err)
}
// Check if these annotations already exist.
emap := make(map[string]int)
for i, elem := range elems {
emap[elem.Pos.MapKey()] = i
}
for _, elem := range additions {
i, found := emap[elem.Pos.MapKey()]
if !found {
elems = append(elems, elem)
delta.Add = append(delta.Add, ElementPos{Label: label, Kind: elem.Kind, Pos: elem.Pos})
} else {
elems[i] = elem // replace properties if same position
}
}
if err := putBatchElements(batch, tk, elems); err != nil {
return fmt.Errorf("couldn't serialize label %d annotations in instance %q: %v\n", label, d.DataName(), err)
}
}
// Notify any subscribers of label annotation changes.
evt := datastore.SyncEvent{Data: d.DataUUID(), Event: ModifyElementsEvent}
msg := datastore.SyncMessage{Event: ModifyElementsEvent, Version: ctx.VersionID(), Delta: delta}
if err := datastore.NotifySubscribers(evt, msg); err != nil {
return err
}
return nil
}
开发者ID:tartavull,项目名称:dvid,代码行数:82,代码来源:annotation.go
|
请发表评论