在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
技术背景: 最近在工作中使用了nginx+redis 的架构,redis在后台做分布式存储,每个redis都存放不同的数据,这些数据都是某门户网站通过Hadoop分析出来的用户行为日志,key是uid,value是user profile,每小时更新量在500-800万条记录,而这些记录一旦生成,我需要在5分钟左右的时间完成所有导入过程。
首先,我在nginx中使用了第三方模块HttpUpstreamConsistent来做负载均衡策略,针对不同用户(uid)选取不同的backend redis: upstream somestream { consistent_hash $arg_uid; server 10.50.1.3:11211; server 10.50.1.4:11211; server 10.50.1.5:11211; } 现在问题来了,由于Hadoop系统处理日志的速度非常快,如果把每条记录都通过Nginx来写入Redis中,这样的速度是无法接受的,而且会影响Nginx对正常请求的服务能力。所以,需要将这些数据以离线的方式导入redis集群中,这样就要重新实现HttpUpstreamConsistent模块了,才能保证读写的哈希策略一致。
下面的源码演示了如何将HttpUpstreamConsistent模块翻译成Lua的过程,(使用了CRC32作散列,依赖库的路径已列在Reference中)。 #!/usr/bin/lua -- chenqi@2014/04/02 --[Reference] --https://github.com/yaoweibin/ngx_http_consistent_hash --https://github.com/davidm/lua-digest-crc32lua local CRC = require('CRC32') local M = {} local CONSISTENT_BUCKETS = 1024 local VIRTUAL_NODE = 160 local HASH_PEERS = {} local CONTINUUM = {} local BUCKETS = {} local function hash_fn(key) return CRC.crc32(key) end -- in-place quicksort function quicksort(array,compareFunc) quick(array,1,#array,compareFunc) end function quick(array,left,right,compareFunc) if(left < right ) then local index = partion(array,left,right,compareFunc) quick(array,left,index-1,compareFunc) quick(array,index+1,right,compareFunc) end end function partion(array,left,right,compareFunc) local key = array[left] local index = left array[index],array[right] = array[right],array[index] local i = left while i< right do if compareFunc( key,array[i]) then array[index],array[i] = array[i],array[index] index = index + 1 end i = i + 1 end array[right],array[index] = array[index],array[right] return index; end -- binary search local function chash_find(point) local mid, lo, hi = 1, 1, #CONTINUUM while 1 do if point <= CONTINUUM[lo][2] or point > CONTINUUM[hi][2] then return CONTINUUM[lo] end -- test middle point mid = lo + math.floor((hi-lo)/2) -- perfect match if point <= CONTINUUM[mid][2] and point > (mid > 1 and CONTINUUM[mid-1][2] or 0) then return CONTINUUM[mid] end -- too low, go up if CONTINUUM[mid][2] < point then lo = mid + 1 else hi = mid - 1 end end end local function chash_init() local n = #HASH_PEERS if n == 0 then print("There is no backend servers") return end local C = {} for i,peer in ipairs(HASH_PEERS) do for k=1, math.floor(VIRTUAL_NODE * peer[1]) do local hash_data = peer[2] .. "-" .. (k - 1) table.insert(C, {peer[2], hash_fn(hash_data)}) end end quicksort(C, function(a,b) return a[2] > b[2] end) CONTINUUM = C --[[ for i=1,#C do print(CONTINUUM[i][1],CONTINUUM[i][2]) end --]] local step = math.floor(0xFFFFFFFF / CONSISTENT_BUCKETS) BUCKETS = {} for i=1, CONSISTENT_BUCKETS do table.insert(BUCKETS, i, chash_find(math.floor(step * (i - 1)))) -- print(BUCKETS[i][1],BUCKETS[i][2]) end end M.init = chash_init local function chash_get_upstream_crc32(point) return BUCKETS[(point % CONSISTENT_BUCKETS)+1][1] end M.get_upstream_crc32 = chash_get_upstream_crc32 local function chash_get_upstream(key) local point = math.floor(hash_fn(key)) return chash_get_upstream_crc32(point) end M.get_upstream = chash_get_upstream local function chash_add_upstream(upstream, weigth) weight = weight or 1 table.insert(HASH_PEERS, {weight, upstream}) end M.add_upstream = chash_add_upstream return M
API调用方式: local redis_login= { "10.50.1.3:11211", "10.50.1.4:11211", "10.50.1.5:11211", } for k, backend in ipairs(redis_login) do chash_login.add_upstream(backend) end chash_login.init() uid="309473941" chash_login.chash_get_upstream(uid) 返回一个backend地址,将该uid对应的数据写入对应的redis中即可,稍后可以使用Nginx读到。
PS:关于redis的mass insertion问题,最高效的方式是批量写入文件(文件格式遵循redis协议),然后使用 redis-cli --pipe 直接导入。
|
请发表评论