• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

nginx+lua+storm的热点缓存的流量分发策略自动降级

原作者: [db:作者] 来自: [db:来源] 收藏 邀请
1、在storm中,实时的计算出瞬间出现的热点。
某个storm task,上面算出了1万个商品的访问次数,LRUMap

频率高一些,每隔5秒,去遍历一次LRUMap,将其中的访问次数进行排序,统计出往后排的95%的商品访问次数的平均值
比如说,95%的商品,访问次数的平均值是100
从最前面开始,往后遍历,去找有没有瞬间出现的热点数据

1000,95%的平均值(100)的10倍,这个时候要设定一个阈值,比如说超出95%平均值得n倍,5倍

我们就认为是瞬间出现的热点数据,判断其可能在短时间内继续扩大的访问量,甚至达到平均值几十倍,或者几百倍

当遍历,发现说第一个商品的访问次数,小于平均值的5倍,就安全了,就break掉这个循环

热点数据,热数据,不是一个概念

有100个商品,前10个商品比较热,都访问量在500左右,其他的普通商品,访问量都在200左右,就说前10个商品是热数据

统计出来

预热的时候,将这些热数据放在缓存中去预热就可以了
热点,前面某个商品的访问量,瞬间超出了普通商品的10倍,或者100倍,1000倍,热点

2、storm这里,会直接发送http请求到nginx上,nginx上用lua脚本去处理这个请求

storm会将热点本身对应的productId,发送到流量分发的nginx上面去,放在本地缓存中

storm会将热点对应的完整的缓存数据,发送到所有的应用nginx服务器上去,直接放在本地缓存中

3、流量分发nginx的分发策略降级

流量分发nginx,加一个逻辑,就是每次访问一个商品详情页的时候,如果发现它是个热点,那么立即做流量分发策略的降级

hash策略,同一个productId的访问都同一台应用nginx服务器上

降级成对这个热点商品,流量分发采取随机负载均衡发送到所有的后端应用nginx服务器上去

瞬间将热点缓存数据的访问,从hash分发,全部到一台nginx,变成了,负载均衡发送到多台nginx上去

避免说大量的流量全部集中到一台机器,50万的访问量到一台nginx,5台应用nginx,每台就可以承载10万的访问量

4、storm还需要保存下来上次识别出来的热点list

下次去识别的时候,这次的热点list跟上次的热点list做一下diff,看看可能有的商品已经不是热点了

热点的取消的逻辑,发送http请求到流量分发的nginx上去,取消掉对应的热点数据,从nginx本地缓存中,删除

 

 帮助类:

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

@SuppressWarnings("deprecation")
public class HttpClientUtils {
	
	/**
	 * 发送GET请求
	 * @param url 请求URL
	 * @return 响应结果
	 */
	@SuppressWarnings("resource")
	public static String sendGetRequest(String url) {
		String httpResponse = null;
		
		HttpClient httpclient = null;
		InputStream is = null;
		BufferedReader br = null;
		
		try {
			// 发送GET请求
			httpclient = new DefaultHttpClient();
			HttpGet httpget = new HttpGet(url);  
			HttpResponse response = httpclient.execute(httpget);
			
			// 处理响应
			HttpEntity entity = response.getEntity();
			if (entity != null) {
				is = entity.getContent();
				br = new BufferedReader(new InputStreamReader(is));      
				
		        StringBuffer buffer = new StringBuffer("");       
		        String line = null;   
		        
		        while ((line = br.readLine()) != null) {  
		        		buffer.append(line + "\n");      
	            }  
	    
		        httpResponse = buffer.toString();      
			}
		} catch (Exception e) {  
			e.printStackTrace();  
		} finally {
			try {
				if(br != null) {
					br.close();
				}
				if(is != null) {
					is.close();
				}
			} catch (Exception e2) {
				e2.printStackTrace();  
			}
		}
		  
		return httpResponse;
	}
	
	/**
	 * 发送post请求
	 * @param url URL
	 * @param map 参数Map
	 * @return
	 */
	@SuppressWarnings({ "rawtypes", "unchecked", "resource" })
	public static String sendPostRequest(String url, Map<String,String> map){  
		HttpClient httpClient = null;  
        HttpPost httpPost = null;  
        String result = null;  
        
        try{  
            httpClient = new DefaultHttpClient();  
            httpPost = new HttpPost(url);  
            
            //设置参数  
            List<NameValuePair> list = new ArrayList<NameValuePair>();  
            Iterator iterator = map.entrySet().iterator();  
            while(iterator.hasNext()){  
                Entry<String,String> elem = (Entry<String, String>) iterator.next();  
                list.add(new BasicNameValuePair(elem.getKey(), elem.getValue()));  
            }  
            if(list.size() > 0){  
                UrlEncodedFormEntity entity = new UrlEncodedFormEntity(list, "utf-8");    
                httpPost.setEntity(entity);  
            }  
            
            HttpResponse response = httpClient.execute(httpPost);  
            if(response != null){  
                HttpEntity resEntity = response.getEntity();  
                if(resEntity != null){  
                    result = EntityUtils.toString(resEntity, "utf-8");    
                }  
            }  
        } catch(Exception ex){  
            ex.printStackTrace();  
        } finally {
        	
        }
        
        return result;  
    }  
	
}

  

	private class HotProductFindThread implements Runnable {

		@SuppressWarnings("deprecation")
		public void run() {
			List<Map.Entry<Long, Long>> productCountList = new ArrayList<Map.Entry<Long, Long>>();
			List<Long> hotProductIdList = new ArrayList<Long>();
			List<Long> lastTimeHotProductIdList = new ArrayList<Long>();
			
			while(true) {
				// 1、将LRUMap中的数据按照访问次数,进行全局的排序
				// 2、计算95%的商品的访问次数的平均值
				// 3、遍历排序后的商品访问次数,从最大的开始
				// 4、如果某个商品比如它的访问量是平均值的10倍,就认为是缓存的热点
				try {
					productCountList.clear();
					hotProductIdList.clear();
					
					if(productCountMap.size() == 0) {
						Utils.sleep(100);
						continue;
					}
					
					LOGGER.info("【HotProductFindThread打印productCountMap的长度】size=" + productCountMap.size());
					
					// 1、先做全局的排序
					
					for(Map.Entry<Long, Long> productCountEntry : productCountMap.entrySet()) {
						if(productCountList.size() == 0) {
							productCountList.add(productCountEntry);
						} else {
							// 比较大小,生成最热topn的算法有很多种
							// 但是我这里为了简化起见,不想引入过多的数据结构和算法的的东西
							// 很有可能还是会有漏洞,但是我已经反复推演了一下了,而且也画图分析过这个算法的运行流程了
							boolean bigger = false;
							
							for(int i = 0; i < productCountList.size(); i++){
								Map.Entry<Long, Long> topnProductCountEntry = productCountList.get(i);
								
								if(productCountEntry.getValue() > topnProductCountEntry.getValue()) {
									int lastIndex = productCountList.size() < productCountMap.size() ? productCountList.size() - 1 : productCountMap.size() - 2;
									for(int j = lastIndex; j >= i; j--) {
										if(j + 1 == productCountList.size()) {
											productCountList.add(null);
										}
										productCountList.set(j + 1, productCountList.get(j));  
									}
									productCountList.set(i, productCountEntry);
									bigger = true;
									break;
								}
							}
							
							if(!bigger) {
								if(productCountList.size() < productCountMap.size()) {
									productCountList.add(productCountEntry);
								}
							}
						}
					}
					
					LOGGER.info("【HotProductFindThread全局排序后的结果】productCountList=" + productCountList); 
					
					// 2、计算出95%的商品的访问次数的平均值
					int calculateCount = (int)Math.floor(productCountList.size() * 0.95);
					
					Long totalCount = 0L;
					for(int i = productCountList.size() - 1; i >= productCountList.size() - calculateCount; i--) {
						totalCount += productCountList.get(i).getValue();
					}
					
					Long avgCount = totalCount / calculateCount;
					
					LOGGER.info("【HotProductFindThread计算出95%的商品的访问次数平均值】avgCount=" + avgCount); 
					
					// 3、从第一个元素开始遍历,判断是否是平均值得10倍
					for(Map.Entry<Long, Long> productCountEntry : productCountList) {
						if(productCountEntry.getValue() > 10 * avgCount) {
							LOGGER.info("【HotProductFindThread发现一个热点】productCountEntry=" + productCountEntry); 
							hotProductIdList.add(productCountEntry.getKey());
							
							if(!lastTimeHotProductIdList.contains(productCountEntry.getKey())) {
								// 将缓存热点反向推送到流量分发的nginx中
								String distributeNginxURL = "http://192.168.31.227/hot?productId=" + productCountEntry.getKey();
								HttpClientUtils.sendGetRequest(distributeNginxURL);
								
								// 将缓存热点,那个商品对应的完整的缓存数据,发送请求到缓存服务去获取,反向推送到所有的后端应用nginx服务器上去
								String cacheServiceURL = "http://192.168.31.179:8080/getProductInfo?productId=" + productCountEntry.getKey();
								String response = HttpClientUtils.sendGetRequest(cacheServiceURL);
								
								List<NameValuePair> params = new ArrayList<NameValuePair>();  
								params.add(new BasicNameValuePair("productInfo", response));    
						        String productInfo = URLEncodedUtils.format(params, HTTP.UTF_8);
								
								String[] appNginxURLs = new String[]{
										"http://192.168.31.187/hot?productId=" + productCountEntry.getKey() + "&" + productInfo,
										"http://192.168.31.19/hot?productId=" + productCountEntry.getKey() + "&" + productInfo
								};
								
								for(String appNginxURL : appNginxURLs) {
									HttpClientUtils.sendGetRequest(appNginxURL);
								}
							}
						}
					}
					
					// 4、实时感知热点数据的消失
					if(lastTimeHotProductIdList.size() == 0) {
						if(hotProductIdList.size() > 0) {
							for(Long productId : hotProductIdList) {
								lastTimeHotProductIdList.add(productId);
							}
							LOGGER.info("【HotProductFindThread保存上次热点数据】lastTimeHotProductIdList=" + lastTimeHotProductIdList);
						}
					} else {
						for(Long productId : lastTimeHotProductIdList) {
							if(!hotProductIdList.contains(productId)) {
								LOGGER.info("【HotProductFindThread发现一个热点消失了】productId=" + productId); 
								// 说明上次的那个商品id的热点,消失了
								// 发送一个http请求给到流量分发的nginx中,取消热点缓存的标识
								String url = "http://192.168.31.227/cancel_hot?productId=" + productId;
								HttpClientUtils.sendGetRequest(url);
							}
						}
						
						if(hotProductIdList.size() > 0) {
							lastTimeHotProductIdList.clear();
							for(Long productId : hotProductIdList) {
								lastTimeHotProductIdList.add(productId);
							}
							LOGGER.info("【HotProductFindThread保存上次热点数据】lastTimeHotProductIdList=" + lastTimeHotProductIdList);
						} else {
							lastTimeHotProductIdList.clear();
						}
					}
					
					Utils.sleep(5000); 
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}

  

  

流量分发

local uri_args = ngx.req.get_uri_args()

local product_id = uri_args["productId"]

local cache_ngx = ngx.shared.my_cache

local hot_product_cache_key = "hot_product_"..product_id

cache_ngx:set(hot_product_cache_key, "true", 60 * 60)


后端应用

local uri_args = ngx.req.get_uri_args()
local product_id = uri_args["productId"]
local product_info = uri_args["productInfo"]

local product_cache_key = "product_info_"..product_id

local cache_ngx = ngx.shared.my_cache

cache_ngx:set(product_cache_key,product_info,60 * 60)

  

math.randomseed(tostring(os.time()):reverse():sub(1, 7))
math.random(1, 2)


local uri_args = ngx.req.get_uri_args()
local productId = uri_args["productId"]
local shopId = uri_args["shopId"]

local hosts = {"192.168.31.187", "192.168.31.19"}
local backend = ""

local hot_product_key = "hot_product_"..productId

local cache_ngx = ngx.shared.my_cache
local hot_product_flag = cache_ngx:get(hot_product_key)

if hot_product_flag == "true" then
  math.randomseed(tostring(os.time()):reverse():sub(1, 7))
  local index = math.random(1, 2)  
  backend = "http://"..hosts[index]
else
  local hash = ngx.crc32_long(productId)
  local index = (hash % 2) + 1
  backend = "http://"..hosts[index]
end

local requestPath = uri_args["requestPath"]
requestPath = "/"..requestPath.."?productId="..productId.."&shopId="..shopId

local http = require("resty.http")
local httpc = http.new()

local resp, err = httpc:request_uri(backend,{
  method = "GET",
  path = requestPath
})

if not resp then
  ngx.say("request error: ", err)
  return
end

ngx.say(resp.body)

httpc:close()

  


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
lua.c:80:31:fatalerror:readline/readline.h:Nosuchfileordirectory发布时间:2022-07-22
下一篇:
Lua学习系列(一)发布时间:2022-07-22
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap