// Test retrieval of skipped sequence using view. Unit test catches panic, but we don't currently have a way
// to simulate an entry that makes it to the bucket (and so is accessible to the view), but doesn't show up on the TAP feed.
// Cache logging in this test validates that view retrieval is working because of TAP latency (item is in the bucket, but hasn't
// been seen on the TAP feed yet). Longer term could consider enhancing leaky bucket to 'miss' the entry on the tap feed.
func TestSkippedViewRetrieval(t *testing.T) {
base.LogKeys["Cache"] = true
base.LogKeys["Cache+"] = true
// Use leaky bucket to have the tap feed 'lose' document 3
leakyConfig := base.LeakyBucketConfig{
TapFeedMissingDocs: []string{"doc-3"},
db := setupTestLeakyDBWithCacheOptions(t, shortWaitCache(), leakyConfig)
defer tearDownTestDB(t, db)
db.ChannelMapper = channels.NewDefaultChannelMapper()
// Allow db to initialize and run initial CleanSkippedSequenceQueue
time.Sleep(10 * time.Millisecond)
// Write sequences direct
WriteDirect(db, []string{"ABC"}, 1)
WriteDirect(db, []string{"ABC"}, 2)
WriteDirect(db, []string{"ABC"}, 3)
// Artificially add 3 skipped, and back date skipped entry by 2 hours to trigger attempted view retrieval during Clean call
db.changeCache.skippedSeqs.Push(&SkippedSequence{3, time.Now().Add(time.Duration(time.Hour * -2))})
db.changeCache.skippedSeqs.Push(&SkippedSequence{5, time.Now().Add(time.Duration(time.Hour * -2))})
// Validate that 3 is in the channel cache, 5 isn't
entries, err := db.changeCache.GetChangesInChannel("ABC", ChangesOptions{Since: SequenceID{Seq: 2}})
assertNoError(t, err, "Get Changes returned error")
assertTrue(t, len(entries) == 1, "Incorrect number of entries returned")
assert.Equals(t, entries[0].DocID, "doc-3")
// Unit test for bug #673
func TestUpdatePrincipal(t *testing.T) {
base.LogKeys["Cache"] = true
base.LogKeys["Changes"] = true
base.LogKeys["Changes+"] = true
db := setupTestDB(t)
defer tearDownTestDB(t, db)
db.ChannelMapper = channels.NewDefaultChannelMapper()
// Create a user with access to channel ABC
authenticator := db.Authenticator()
user, _ := authenticator.NewUser("naomi", "letmein", channels.SetOf("ABC"))
// Validate that a call to UpdatePrincipals with no changes to the user doesn't allocate a sequence
userInfo, err := db.GetPrincipal("naomi", true)
userInfo.ExplicitChannels = base.SetOf("ABC")
_, err = db.UpdatePrincipal(*userInfo, true, true)
assertNoError(t, err, "Unable to update principal")
nextSeq, err := db.sequences.nextSequence()
assert.Equals(t, nextSeq, uint64(1))
// Validate that a call to UpdatePrincipals with changes to the user does allocate a sequence
userInfo, err = db.GetPrincipal("naomi", true)
userInfo.ExplicitChannels = base.SetOf("ABC", "PBS")
_, err = db.UpdatePrincipal(*userInfo, true, true)
assertNoError(t, err, "Unable to update principal")
nextSeq, err = db.sequences.nextSequence()
assert.Equals(t, nextSeq, uint64(3))
// Currently disabled, due to test race conditions between the continuous changes start (in its own goroutine),
// and the send of the continuous terminator. We can't ensure that the changes request has been
// started before all other test operations have been sent (so that we never break out of the changes loop)
func RaceTestPollResultLongRunningContinuous(t *testing.T) {
// Reset the index expvars
db := setupTestDBForChangeIndex(t)
defer tearDownTestDB(t, db)
db.ChannelMapper = channels.NewDefaultChannelMapper()
WriteDirectWithKey(db, "docABC_1", []string{"ABC"}, 1)
time.Sleep(100 * time.Millisecond)
// Do a basic changes to trigger start of polling for channel
changes, err := db.GetChanges(base.SetOf("ABC"), ChangesOptions{Since: simpleClockSequence(0)})
assertTrue(t, err == nil, "Error getting changes")
assert.Equals(t, len(changes), 1)
log.Printf("Changes:%+v", changes[0])
// Start a continuous changes on channel (ABC). Waitgroup keeps test open until continuous is terminated
var wg sync.WaitGroup
continuousTerminator := make(chan bool)
go func() {
defer wg.Done()
since, err := db.ParseSequenceID("2-0")
abcChanges, err := db.GetChanges(base.SetOf("ABC"), ChangesOptions{Since: since, Wait: true, Continuous: true, Terminator: continuousTerminator})
assertTrue(t, err == nil, "Error getting changes")
log.Printf("Got %d changes", len(abcChanges))
log.Println("Continuous completed")
for i := 0; i < 10000; i++ {
WriteDirectWithKey(db, fmt.Sprintf("docABC_%d", i), []string{"ABC"}, 3)
time.Sleep(1 * time.Millisecond)
time.Sleep(1000 * time.Millisecond) // wait for indexing, polling, and changes processing
log.Println("closed terminator")
time.Sleep(100 * time.Millisecond)
WriteDirectWithKey(db, "terminatorCheck", []string{"ABC"}, 1)
func TestPollResultReuseLongpoll(t *testing.T) {
// Reset the index expvars
db := setupTestDBForChangeIndex(t)
defer tearDownTestDB(t, db)
db.ChannelMapper = channels.NewDefaultChannelMapper()
WriteDirectWithKey(db, "docABC_1", []string{"ABC"}, 1)
time.Sleep(100 * time.Millisecond)
// Do a basic changes to trigger start of polling for channel
changes, err := db.GetChanges(base.SetOf("ABC"), ChangesOptions{Since: simpleClockSequence(0)})
assertTrue(t, err == nil, "Error getting changes")
assert.Equals(t, len(changes), 1)
log.Printf("Changes:%+v", changes[0])
// Start a longpoll changes, use waitgroup to delay the test until it returns.
var wg sync.WaitGroup
go func() {
defer wg.Done()
since, err := db.ParseSequenceID("2-0")
assertTrue(t, err == nil, "Error parsing sequence ID")
abcHboChanges, err := db.GetChanges(base.SetOf("ABC", "HBO"), ChangesOptions{Since: since, Wait: true})
assertTrue(t, err == nil, "Error getting changes")
// Expects two changes - the nil that's sent on initial wait, and then docABC_2
assert.Equals(t, len(abcHboChanges), 2)
time.Sleep(100 * time.Millisecond)
// Write an entry to channel ABC to notify the waiting longpoll
WriteDirectWithKey(db, "docABC_2", []string{"ABC"}, 2)
// Use expvars to confirm poll hits/misses (can't tell from changes response whether it used poll results,
// or reloaded from index). Expect one poll hit (the longpoll request), and one miss (the basic changes request)
assert.Equals(t, getExpvarAsString(indexExpvars, "getChanges_lastPolled_hit"), "1")
assert.Equals(t, getExpvarAsString(indexExpvars, "getChanges_lastPolled_miss"), "1")
// Currently disabled, due to test race conditions between the continuous changes start (in its own goroutine),
// and the send of the continuous terminator. We can't ensure that the changes request has been
// started before all other test operations have been sent (so that we never break out of the changes loop)
func RaceTestPollResultReuseContinuous(t *testing.T) {
// Reset the index expvars
db := setupTestDBForChangeIndex(t)
defer tearDownTestDB(t, db)
db.ChannelMapper = channels.NewDefaultChannelMapper()
WriteDirectWithKey(db, "docABC_1", []string{"ABC"}, 1)
time.Sleep(100 * time.Millisecond)
// Do a basic changes to trigger start of polling for channel
changes, err := db.GetChanges(base.SetOf("ABC"), ChangesOptions{Since: simpleClockSequence(0)})
assertTrue(t, err == nil, "Error getting changes")
assert.Equals(t, len(changes), 1)
log.Printf("Changes:%+v", changes[0])
// Start a continuous changes on a different channel (CBS). Waitgroup keeps test open until continuous is terminated
var wg sync.WaitGroup
continuousTerminator := make(chan bool)
go func() {
defer wg.Done()
since, err := db.ParseSequenceID("2-0")
abcHboChanges, err := db.GetChanges(base.SetOf("ABC", "HBO"), ChangesOptions{Since: since, Wait: true, Continuous: true, Terminator: continuousTerminator})
assertTrue(t, err == nil, "Error getting changes")
// Expect 2 entries + 3 nil entries (one per wait)
assert.Equals(t, len(abcHboChanges), 5)
for i := 0; i < len(abcHboChanges); i++ {
log.Printf("Got change:%+v", abcHboChanges[i])
log.Println("Continuous completed")
time.Sleep(100 * time.Millisecond)
// Write an entry to channel HBO to shift the continuous since value ahead
WriteDirectWithKey(db, "docHBO_1", []string{"HBO"}, 3)
time.Sleep(1000 * time.Millisecond) // wait for indexing, polling, and changes processing
// Write an entry to channel ABC - last polled should be used
WriteDirectWithKey(db, "docABC_2", []string{"ABC"}, 4)
time.Sleep(1000 * time.Millisecond) // wait for indexing, polling, and changes processing
log.Println("closed terminator")
time.Sleep(100 * time.Millisecond)
WriteDirectWithKey(db, "terminatorCheck", []string{"HBO"}, 1)
// Use expvars to confirm poll hits/misses (can't tell from changes response whether it used poll results,
// or reloaded from index). Expect two poll hits (docHBO_1, docABC_2), and one miss (the initial changes request)
assert.Equals(t, getExpvarAsString(indexExpvars, "getChanges_lastPolled_hit"), "2")
assert.Equals(t, getExpvarAsString(indexExpvars, "getChanges_lastPolled_miss"), "1")
time.Sleep(100 * time.Millisecond)
// Make a changes request prior to the last polled range, ensure it doesn't reuse polled results
changes, err = db.GetChanges(base.SetOf("ABC"), ChangesOptions{Since: simpleClockSequence(0)})
assert.Equals(t, getExpvarAsString(indexExpvars, "getChanges_lastPolled_hit"), "2")
assert.Equals(t, getExpvarAsString(indexExpvars, "getChanges_lastPolled_miss"), "2")
// Currently disabled, due to test race conditions between the continuous changes start (in its own goroutine),
// and the send of the continuous terminator. We can't ensure that the changes request has been
// started before all other test operations have been sent (so that we never break out of the changes loop)
func RaceTestPollingChangesFeed(t *testing.T) {
//base.LogKeys["DIndex+"] = true
db := setupTestDBForChangeIndex(t)
defer tearDownTestDB(t, db)
db.ChannelMapper = channels.NewDefaultChannelMapper()
// Start a longpoll changes
go func() {
abcHboChanges, err := db.GetChanges(base.SetOf("ABC", "HBO"), ChangesOptions{Since: simpleClockSequence(0), Wait: true})
assertTrue(t, err == nil, "Error getting changes")
// Expects two changes - the nil that's sent on initial wait, and then docABC_1
for _, change := range abcHboChanges {
log.Printf("abcHboChange:%v", change)
assert.Equals(t, len(abcHboChanges), 2)
time.Sleep(100 * time.Millisecond)
// Write an entry to channel ABC to notify the waiting longpoll
WriteDirectWithKey(db, "docABC_1", []string{"ABC"}, 1)
// Start a continuous changes on a different channel (CBS). Waitgroup keeps test open until continuous is terminated
var wg sync.WaitGroup
continuousTerminator := make(chan bool)
go func() {
defer wg.Done()
cbsChanges, err := db.GetChanges(base.SetOf("CBS"), ChangesOptions{Since: simpleClockSequence(0), Wait: true, Continuous: true, Terminator: continuousTerminator})
assertTrue(t, err == nil, "Error getting changes")
// Expect 15 entries + 16 nil entries (one per wait)
assert.Equals(t, len(cbsChanges), 25)
log.Println("Continuous completed")
// Write another entry to channel ABC to start the clock for unread polls
time.Sleep(1000 * time.Millisecond)
WriteDirectWithKey(db, "docABC_2", []string{"ABC"}, 1)
// Verify that the channel is initially in the polled set
changeIndex, _ := db.changeCache.(*kvChangeIndex)
assertTrue(t, changeIndex.reader.getChannelReader("ABC") != nil, "Channel reader should not be nil")
log.Printf("changeIndex readers: %+v", changeIndex.reader.channelIndexReaders)
// Send multiple docs to channels HBO, PBS, CBS. Expected results:
// ABC - Longpoll has ended - should trigger "nobody listening" expiry of channel reader
// CBS - Active continuous feed - channel reader shouldn't expire
// PBS - No changes feeds have requested this channel - no channel reader should be created
// HBO - New longpoll started mid-way before poll limit reached, channel reader shouldn't expire
time.Sleep(20 * time.Millisecond)
for i := 0; i < 12; i++ {
log.Printf("writing multiDoc_%d", i)
WriteDirectWithKey(db, fmt.Sprintf("multiDoc_%d", i), []string{"PBS", "HBO", "CBS"}, 1)
// Midway through, read from HBO
if i == 9 {
// wait for polling cycle
time.Sleep(600 * time.Millisecond)
hboChanges, err := db.GetChanges(base.SetOf("HBO"), ChangesOptions{Since: simpleClockSequence(0), Wait: true})
assertTrue(t, err == nil, "Error getting changes")
assert.Equals(t, len(hboChanges), 10)
time.Sleep(kPollFrequency * time.Millisecond)
// Verify that the changes feed has been started (avoids test race conditions where we close the terminator before
// starting the changes feed
for i := 0; i <= 40; i++ {
channelChangesCount := getExpvarAsString(dbExpvars, "channelChangesFeeds")
log.Printf("channelChangesCount:%s", channelChangesCount)
if channelChangesCount != "" {
time.Sleep(100 * time.Millisecond)
log.Println("closed terminator")
// Send another entry to continuous CBS feed in order to trigger the terminator check
time.Sleep(100 * time.Millisecond)
WriteDirectWithKey(db, "terminatorCheck", []string{"CBS"}, 1)
time.Sleep(100 * time.Millisecond)
// Validate that the ABC reader was deleted due to inactivity
log.Printf("channel reader ABC:%+v", changeIndex.reader.getChannelReader("ABC"))
assertTrue(t, changeIndex.reader.getChannelReader("ABC") == nil, "Channel reader should be nil")
// Validate that the HBO reader is still present
assertTrue(t, changeIndex.reader.getChannelReader("HBO") != nil, "Channel reader should be exist")
// Start another longpoll changes for ABC, ensure it's successful (will return the existing 2 records, no wait)
changes, err := db.GetChanges(base.SetOf("ABC"), ChangesOptions{Since: simpleClockSequence(0), Wait: true})
assertTrue(t, err == nil, "Error getting changes")
assert.Equals(t, len(changes), 2)
// Repeat, verify use of existing channel reader