在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
Program.cs using System; using System.Text; using CommonLinkLibrary.Util; using Newtonsoft.Json; namespace SSDB { class Program { static void Main(string[] args) { InitSsdb(); var channelName = "PersonUpdateLog"; var channelNameKeys = channelName + "_Keys"; var ssdb = ConnectionPool.GetSocket(); ssdb.zclear(channelName); ssdb.del(channelNameKeys); Console.WriteLine("清空"+channelName+"成功!"); for (var i = 1; i <= 100; i++) { var pk = GetPrimaryKey(ssdb, channelNameKeys); var myBean = new PersonInfoLogBean { Action = "新增", NewPersonName = "黄海" + i, OldPersonName = "黄海" + i, PersonId = i.ToString(), OldAge = i.ToString(), NewAge = i.ToString() }; var msg = JsonConvert.SerializeObject(myBean); ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk); } for (var i = 1; i <= 100; i++) { var pk = GetPrimaryKey(ssdb, channelNameKeys); var myBean = new PersonInfoLogBean { Action = "修改", NewPersonName = "(新)黄海" + i, OldPersonName = "(旧)黄海" + i, PersonId = i.ToString(), OldAge = i.ToString(), NewAge = i.ToString() }; var msg = JsonConvert.SerializeObject(myBean); ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk); } for (var i = 1; i <= 100; i++) { var pk = GetPrimaryKey(ssdb, channelNameKeys); var myBean = new PersonInfoLogBean { Action = "删除", NewPersonName = "(新)黄海" + i, OldPersonName = "(旧)黄海" + i, PersonId = i.ToString(), OldAge = i.ToString(), NewAge = i.ToString() }; var msg = JsonConvert.SerializeObject(myBean); ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk); } Console.WriteLine("保存成功!"); const int size = 10; var startId = 0; while (true) { var keys = ssdb.zscan(channelName, "", startId, 999999999999, size); for (var i = 0; i < keys.Length; i++) { Console.WriteLine(keys[i].Key + " " + keys[i].Value); } if (size != keys.Length) { break; } else { startId = startId + size; } } ConnectionPool.PutSocket(ssdb); Console.ReadKey(); } public static void InitSsdb() { //连接池的初始化工作 var serverIp = "10.10.6.199"; ConnectionPool.InitializeConnectionPool(serverIp, 8888, 20, 100); } public static long GetPrimaryKey(SsdbClient ssdb, string primaryKeyName) { return ssdb.incr(Encoding.UTF8.GetBytes(primaryKeyName), 1); } } } ConnectionPool.cs using System; using System.Collections.Generic; using System.Threading; namespace CommonLinkLibrary.Util { public static class ConnectionPool { /// <summary> /// Queue of available socket connections. /// </summary> private static Queue<SsdbClient> _availableSockets; /// <summary> /// The maximum size of the connection pool. /// </summary> private static int _poolMaxSize = 20; private static string _hostIpAddress; private static int _hostPortNumber; /// <summary> /// Created host Connection counter /// </summary> private static int _socketCounter; public static bool Initialized; /// <summary> /// Initialize host Connection pool /// </summary> /// <param name="hostPortNumber"></param> /// <param name="minConnections">Initial number of connections</param> /// <param name="maxConnections">The maximum size of the connection pool</param> /// <param name="hostIpAddress"></param> public static void InitializeConnectionPool(string hostIpAddress, int hostPortNumber, int minConnections, int maxConnections) { _socketCounter = 0; _poolMaxSize = maxConnections; _hostIpAddress = hostIpAddress; _hostPortNumber = hostPortNumber; _availableSockets = new Queue<SsdbClient>(); for (var i = 0; i < minConnections; i++) { var cachedSocket = OpenSocket(hostIpAddress, hostPortNumber); PutSocket(cachedSocket); } Initialized = true; } /// <summary> /// Get an open socket from the connection pool. /// </summary> /// <returns>Socket returned from the pool or new socket opened. </returns> public static SsdbClient GetSocket() { //如果连接池中还有可用的连接,那么调用 if (_availableSockets.Count > 0) { lock (_availableSockets) { SsdbClient socket = null; while (_availableSockets.Count > 0) { socket = _availableSockets.Dequeue(); if (socket.Connected) { return socket; } socket.Close(); Interlocked.Decrement(ref _socketCounter); } } } //如果没有可用的连接,那么新打开一个连接 return OpenSocket(_hostIpAddress, _hostPortNumber); } /// <summary> /// Return the given socket back to the socket pool. /// </summary> /// <param name="socket">Socket connection to return.</param> public static void PutSocket(SsdbClient socket) { lock (_availableSockets) { if (_availableSockets.Count < _poolMaxSize) // Configuration Value { if (socket != null) { if (socket.Connected) { _availableSockets.Enqueue(socket); } else { socket.Close(); } } } else { socket.Close(); } } } /// <summary> /// Open a new socket connection. /// </summary> /// <returns>Newly opened socket connection.</returns> private static SsdbClient OpenSocket(string hostIpAddress, int hostPortNumber) { if (_socketCounter < _poolMaxSize) { Interlocked.Increment(ref _socketCounter); var client = new SsdbClient(hostIpAddress, hostPortNumber); return client; } throw new Exception("Connection Pool reached its limit"); } } } Link.cs using System; using System.Collections.Generic; using System.IO; using System.Net.Sockets; using System.Text; namespace CommonLinkLibrary.Util { internal class Link : IDisposable { private MemoryStream _recvBuf = new MemoryStream(8*1024); private TcpClient _sock; //供程序员显式调用的Dispose方法 public void Dispose() { _recvBuf.Dispose(); close(); } //protected的Dispose方法,保证不会被外部调用。 //传入bool值disposing以确定是否释放托管资源 //供GC调用的析构函数 ~Link() { close(); } public Link(string host, int port) { _sock = new TcpClient(host, port) {NoDelay = true}; _sock.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); } public void close() { if (_sock != null) { _sock.Close(); } _sock = null; } public List<byte[]> request(string cmd, params string[] args) { var req = new List<byte[]>(1 + args.Length) {Encoding.UTF8.GetBytes(cmd)}; foreach (var s in args) { req.Add(Encoding.UTF8.GetBytes(s)); } return request(req); } public List<byte[]> request(string cmd, params byte[][] args) { var req = new List<byte[]>(1 + args.Length) {Encoding.UTF8.GetBytes(cmd)}; req.AddRange(args); return request(req); } public List<byte[]> request(List<byte[]> req) { var buf = new MemoryStream(); foreach (var p in req) { if(p!=null) { var len = Encoding.UTF8.GetBytes(p.Length.ToString()); buf.Write(len, 0, len.Length); buf.WriteByte((byte)'\n'); buf.Write(p, 0, p.Length); buf.WriteByte((byte)'\n'); } } buf.WriteByte((byte) '\n'); var bs = buf.GetBuffer(); _sock.GetStream().Write(bs, 0, (int) buf.Length); //Console.Write(Encoding.UTF8.GetString(bs, 0, (int)buf.Length)); return recv(); } private List<byte[]> recv() { while (true) { var ret = parse(); if (ret != null) { return ret; } var bs = new byte[8192]; var len = _sock.GetStream().Read(bs, 0, bs.Length); //Console.WriteLine("<< " + Encoding.UTF8.GetString(bs)); _recvBuf.Write(bs, 0, len); } } private static int memchr(byte[] bs, byte b, int offset) { for (var i = offset; i < bs.Length; i++) { if (bs[i] == b) { return i; } } return -1; } private List<byte[]> parse() { var list = new List<byte[]>(); var buf = _recvBuf.GetBuffer(); var idx = 0; while (true) { var pos = memchr(buf, (byte) '\n', idx); //System.out.println("pos: " + pos + " idx: " + idx); if (pos == -1) { break; } if (pos == idx || (pos == idx + 1 && buf[idx] == '\r')) { idx += 1; // if '\r', next time will skip '\n' // ignore empty leading lines if (list.Count == 0) { continue; } var left = (int) _recvBuf.Length - idx; _recvBuf = new MemoryStream(8192); if (left > 0) { _recvBuf.Write(buf, idx, left); } return list; } var lens = new byte[pos - idx]; Array.Copy(buf, idx, lens, 0, lens.Length); var len = int.Parse(Encoding.UTF8.GetString(lens)); idx = pos + 1; if (idx + len >= _recvBuf.Length) { break; } var data = new byte[len]; Array.Copy(buf, idx, data, 0, data.Length); //Console.WriteLine("len: " + len + " data: " + Encoding.UTF8.GetString(data)); idx += len + 1; // skip '\n' list.Add(data); } return null; } } } PersonInfoLogBean.cs namespace SSDB { class PersonInfoLogBean { public string Action { get; set; } public string PersonId { get; set; } public string OldPersonName { get; set; } public string NewPersonName { get; set; } public string OldAge { get; set; } public string NewAge { get; set; } } } SsdbClient.cs using System; using System.Collections.Generic; using System.Net.Sockets; using System.Text; namespace CommonLinkLibrary.Util { public class SsdbClient : TcpClient { private readonly Link _link; private string _respCode; public SsdbClient(string hostIpAddress, int hostPortNumber) : base(hostIpAddress, hostPortNumber) { _link = new Link(hostIpAddress, hostPortNumber); } public List<byte[]> request(string cmd, params string[] args) { return _link.request(cmd, args); } public List<byte[]> request(string cmd, params byte[][] args) { return _link.request(cmd, args); } public List<byte[]> request(List<byte[]> req) { return _link.request(req); } private byte[] _bytes(string s) { if (s == null) return null; return Encoding.UTF8.GetBytes(s); } private string _string(byte[] bs) { return Encoding.UTF8.GetString(bs); } private KeyValuePair<string, byte[]>[] parse_scan_resp(List<byte[]> resp) { _respCode = _string(resp[0]); var size = (resp.Count - 1)/2; var kvs = new KeyValuePair<string, byte[]>[size]; for (var i = 0; i < size; i += 1) { var key = _string(resp[i*2 + 1]); var val = resp[i*2 + 2]; kvs[i] = new KeyValuePair<string, byte[]>(key, val); } return kvs; } /***** kv *****/ public bool exists(byte[] key) { var resp = request("exists", key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } return (_string(resp[1]) == "1" ? true : false); } public bool exists(string key) { return exists(_bytes(key)); } public void set(byte[] key, byte[] val) { var resp = request("set", key, val); _respCode = _string(resp[0]); } public void set(string key, string val) { set(_bytes(key), _bytes(val)); } /// <summary> /// </summary> /// <param name="key"></param> /// <param name="val"></param> /// <returns>returns true if name.key is found, otherwise returns false.</returns> public bool get(byte[] key, out byte[] val) { val = null; var resp = request("get", key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } val = resp[1]; return true; } public bool get(string key, out byte[] val) { return get(_bytes(key), out val); } public bool get(string key, out string val) { val = null; byte[] bs; if (!get(key, out bs)) { return false; } val = _string(bs); return true; } public void del(byte[] key) { var resp = request("del", key); _respCode = _string(resp[0]); } public void del(string key) { del(_bytes(key)); } public KeyValuePair<string, byte[]>[] scan(string key_start, string key_end, long limit) { var resp = request("scan", key_start, key_end, limit.ToString()); return parse_scan_resp(resp); } public KeyValuePair<string, byte[]>[] rscan(string key_start, string key_end, long limit) { var resp = request("rscan", key_start, key_end, limit.ToString()); return parse_scan_resp(resp); } /***** hash *****/ public void hset(byte[] name, byte[] key, byte[] val) { var resp = request("hset", name, key, val); _respCode = _string(resp[0]); } public void hset(string name, string key, byte[] val) { hset(_bytes(name), _bytes(key), val); } public void hset(string name, string key, string val) { hset(_bytes(name), _bytes(key), _bytes(val)); } /// <summary> /// </summary> /// <param name="name"></param> /// <param name="key"></param> /// <param name="val"></param> /// <returns>returns true if name.key is found, otherwise returns false.</returns> public bool hget(byte[] name, byte[] key, out byte[] val) { val = null 全部评论
专题导读
热门推荐
热门话题
阅读排行榜
|
请发表评论