• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

C#使用SSDB管理增量日志并提供查询

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

Program.cs

using System;
using System.Text;
using CommonLinkLibrary.Util;
using Newtonsoft.Json;

namespace SSDB
{
    class Program
    {
        static void Main(string[] args)
        {
            InitSsdb();

            var channelName = "PersonUpdateLog";
            var channelNameKeys = channelName + "_Keys";
            var ssdb = ConnectionPool.GetSocket();
            ssdb.zclear(channelName);
            ssdb.del(channelNameKeys);
            Console.WriteLine("清空"+channelName+"成功!");

            for (var i = 1; i <= 100; i++)
            {
                var pk = GetPrimaryKey(ssdb, channelNameKeys);
                var myBean = new PersonInfoLogBean
                {
                    Action = "新增",
                    NewPersonName = "黄海" + i,
                    OldPersonName = "黄海" + i,
                    PersonId = i.ToString(),
                    OldAge = i.ToString(),
                    NewAge = i.ToString()
                };
                var msg = JsonConvert.SerializeObject(myBean);
                ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk);
            }

            for (var i = 1; i <= 100; i++)
            {
                var pk = GetPrimaryKey(ssdb, channelNameKeys);
                var myBean = new PersonInfoLogBean
                {
                    Action = "修改",
                    NewPersonName = "(新)黄海" + i,
                    OldPersonName = "(旧)黄海" + i,
                    PersonId = i.ToString(),
                    OldAge = i.ToString(),
                    NewAge = i.ToString()
                };
                var msg = JsonConvert.SerializeObject(myBean);
                ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk);
            }

            for (var i = 1; i <= 100; i++)
            {
                var pk = GetPrimaryKey(ssdb, channelNameKeys);
                var myBean = new PersonInfoLogBean
                {
                    Action = "删除",
                    NewPersonName = "(新)黄海" + i,
                    OldPersonName = "(旧)黄海" + i,
                    PersonId = i.ToString(),
                    OldAge = i.ToString(),
                    NewAge = i.ToString()
                };
                var msg = JsonConvert.SerializeObject(myBean);
                ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk);
            }
            Console.WriteLine("保存成功!");

            const int size = 10;
            var startId = 0;
                while (true)
                {
                    var keys = ssdb.zscan(channelName, "", startId, 999999999999, size);
                    for (var i = 0; i < keys.Length; i++)
                    {
                        Console.WriteLine(keys[i].Key + "      " + keys[i].Value);
                    }
                    if (size != keys.Length)
                    {
                        break;
                    }
                    else
                    {
                        startId = startId + size;
                    }
                }
                ConnectionPool.PutSocket(ssdb);
                Console.ReadKey();
        }
        public static void InitSsdb()
        {
            //连接池的初始化工作
            var serverIp = "10.10.6.199";
            ConnectionPool.InitializeConnectionPool(serverIp, 8888, 20, 100);
        }
        
        public static long GetPrimaryKey(SsdbClient ssdb, string primaryKeyName)
        {
            return ssdb.incr(Encoding.UTF8.GetBytes(primaryKeyName), 1);
        }
    }
}

ConnectionPool.cs

using System;
using System.Collections.Generic;
using System.Threading;

namespace CommonLinkLibrary.Util
{
    public static class ConnectionPool
    {
        /// <summary>
        ///     Queue of available socket connections.
        /// </summary>
        private static Queue<SsdbClient> _availableSockets;

        /// <summary>
        ///     The maximum size of the connection pool.
        /// </summary>
        private static int _poolMaxSize = 20;

        private static string _hostIpAddress;
        private static int _hostPortNumber;

        /// <summary>
        ///     Created host Connection counter
        /// </summary>
        private static int _socketCounter;

        public static bool Initialized;

        /// <summary>
        ///     Initialize host Connection pool
        /// </summary>
        /// <param name="hostPortNumber"></param>
        /// <param name="minConnections">Initial number of connections</param>
        /// <param name="maxConnections">The maximum size of the connection pool</param>
        /// <param name="hostIpAddress"></param>
        public static void InitializeConnectionPool(string hostIpAddress, int hostPortNumber, int minConnections,
            int maxConnections)
        {
            _socketCounter = 0;
            _poolMaxSize = maxConnections;
            _hostIpAddress = hostIpAddress;
            _hostPortNumber = hostPortNumber;

            _availableSockets = new Queue<SsdbClient>();

            for (var i = 0; i < minConnections; i++)
            {
                var cachedSocket = OpenSocket(hostIpAddress, hostPortNumber);
                PutSocket(cachedSocket);
            }

            Initialized = true;
        }

        /// <summary>
        ///     Get an open socket from the connection pool.
        /// </summary>
        /// <returns>Socket returned from the pool or new socket opened. </returns>
        public static SsdbClient GetSocket()
        {
            //如果连接池中还有可用的连接,那么调用
            if (_availableSockets.Count > 0)
            {
                lock (_availableSockets)
                {
                    SsdbClient socket = null;
                    while (_availableSockets.Count > 0)
                    {
                        socket = _availableSockets.Dequeue();

                        if (socket.Connected)
                        {
                            return socket;
                        }
                        socket.Close();
                        Interlocked.Decrement(ref _socketCounter);
                    }
                }
            }
            //如果没有可用的连接,那么新打开一个连接
            return OpenSocket(_hostIpAddress, _hostPortNumber);
        }

        /// <summary>
        ///     Return the given socket back to the socket pool.
        /// </summary>
        /// <param name="socket">Socket connection to return.</param>
        public static void PutSocket(SsdbClient socket)
        {
            lock (_availableSockets)
            {
                if (_availableSockets.Count < _poolMaxSize) // Configuration Value
                {
                    if (socket != null)
                    {
                        if (socket.Connected)
                        {
                            _availableSockets.Enqueue(socket);
                        }
                        else
                        {
                            socket.Close();
                        }
                    }
                }
                else
                {
                    socket.Close();
                }
            }
        }

        /// <summary>
        ///     Open a new socket connection.
        /// </summary>
        /// <returns>Newly opened socket connection.</returns>
        private static SsdbClient OpenSocket(string hostIpAddress, int hostPortNumber)
        {
            if (_socketCounter < _poolMaxSize)
            {
                Interlocked.Increment(ref _socketCounter);
                var client = new SsdbClient(hostIpAddress, hostPortNumber);
                return client;
            }
            throw new Exception("Connection Pool reached its limit");
        }
    }
}

Link.cs

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;

namespace CommonLinkLibrary.Util
{
    internal class Link : IDisposable
    {
        private MemoryStream _recvBuf = new MemoryStream(8*1024);
        private TcpClient _sock;
        //供程序员显式调用的Dispose方法
        public void Dispose()
        {
            _recvBuf.Dispose();
            close();
        }

        //protected的Dispose方法,保证不会被外部调用。
        //传入bool值disposing以确定是否释放托管资源
      

        //供GC调用的析构函数
        ~Link()
        {
            close();
        }
        public Link(string host, int port)
        {
            _sock = new TcpClient(host, port) {NoDelay = true};
            _sock.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
        }
        
        public void close()
        {
            if (_sock != null)
            {
                _sock.Close();
            }
            _sock = null;
        }

        public List<byte[]> request(string cmd, params string[] args)
        {
            var req = new List<byte[]>(1 + args.Length) {Encoding.UTF8.GetBytes(cmd)};
            foreach (var s in args)
            {
                req.Add(Encoding.UTF8.GetBytes(s));
            }
            return request(req);
        }

        public List<byte[]> request(string cmd, params byte[][] args)
        {
            var req = new List<byte[]>(1 + args.Length) {Encoding.UTF8.GetBytes(cmd)};
            req.AddRange(args);
            return request(req);
        }

        public List<byte[]> request(List<byte[]> req)
        {
            var buf = new MemoryStream();
            foreach (var p in req)
            {
                if(p!=null)
                {
                    var len = Encoding.UTF8.GetBytes(p.Length.ToString());
                    buf.Write(len, 0, len.Length);
                    buf.WriteByte((byte)'\n');
                    buf.Write(p, 0, p.Length);
                    buf.WriteByte((byte)'\n');
                }                
            }
            buf.WriteByte((byte) '\n');

            var bs = buf.GetBuffer();
            _sock.GetStream().Write(bs, 0, (int) buf.Length);
            //Console.Write(Encoding.UTF8.GetString(bs, 0, (int)buf.Length));
            return recv();
        }

        private List<byte[]> recv()
        {
            while (true)
            {
                var ret = parse();
                if (ret != null)
                {
                    return ret;
                }
                var bs = new byte[8192];
                var len = _sock.GetStream().Read(bs, 0, bs.Length);
                //Console.WriteLine("<< " + Encoding.UTF8.GetString(bs));
                _recvBuf.Write(bs, 0, len);
            }
        }

        private static int memchr(byte[] bs, byte b, int offset)
        {
            for (var i = offset; i < bs.Length; i++)
            {
                if (bs[i] == b)
                {
                    return i;
                }
            }
            return -1;
        }

        private List<byte[]> parse()
        {
            var list = new List<byte[]>();
            var buf = _recvBuf.GetBuffer();

            var idx = 0;
            while (true)
            {
                var pos = memchr(buf, (byte) '\n', idx);
                //System.out.println("pos: " + pos + " idx: " + idx);
                if (pos == -1)
                {
                    break;
                }
                if (pos == idx || (pos == idx + 1 && buf[idx] == '\r'))
                {
                    idx += 1; // if '\r', next time will skip '\n'
                    // ignore empty leading lines
                    if (list.Count == 0)
                    {
                        continue;
                    }
                    var left = (int) _recvBuf.Length - idx;
                    _recvBuf = new MemoryStream(8192);
                    if (left > 0)
                    {
                        _recvBuf.Write(buf, idx, left);
                    }
                    return list;
                }
                var lens = new byte[pos - idx];
                Array.Copy(buf, idx, lens, 0, lens.Length);
                var len = int.Parse(Encoding.UTF8.GetString(lens));

                idx = pos + 1;
                if (idx + len >= _recvBuf.Length)
                {
                    break;
                }
                var data = new byte[len];
                Array.Copy(buf, idx, data, 0, data.Length);

                //Console.WriteLine("len: " + len + " data: " + Encoding.UTF8.GetString(data));
                idx += len + 1; // skip '\n'
                list.Add(data);
            }
            return null;
        }
    }
}

PersonInfoLogBean.cs

namespace SSDB
{
    class PersonInfoLogBean
    {
        public string Action { get; set; }
        public string PersonId { get; set; }
        public string OldPersonName { get; set; }
        public string NewPersonName { get; set; }

        public string OldAge { get; set; }
        public string NewAge { get; set; }

    }
}

SsdbClient.cs

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;

namespace CommonLinkLibrary.Util
{
    public class SsdbClient : TcpClient
    {
        private readonly Link _link;
        private string _respCode;

        public SsdbClient(string hostIpAddress, int hostPortNumber) : base(hostIpAddress, hostPortNumber)
        {
            _link = new Link(hostIpAddress, hostPortNumber);
        }

        public List<byte[]> request(string cmd, params string[] args)
        {
            return _link.request(cmd, args);
        }

        public List<byte[]> request(string cmd, params byte[][] args)
        {
            return _link.request(cmd, args);
        }

        public List<byte[]> request(List<byte[]> req)
        {
            return _link.request(req);
        }

        private byte[] _bytes(string s)
        {
            if (s == null) return null;
            return Encoding.UTF8.GetBytes(s);
        }

        private string _string(byte[] bs)
        {
            return Encoding.UTF8.GetString(bs);
        }

        private KeyValuePair<string, byte[]>[] parse_scan_resp(List<byte[]> resp)
        {
            _respCode = _string(resp[0]);

            var size = (resp.Count - 1)/2;
            var kvs = new KeyValuePair<string, byte[]>[size];
            for (var i = 0; i < size; i += 1)
            {
                var key = _string(resp[i*2 + 1]);
                var val = resp[i*2 + 2];
                kvs[i] = new KeyValuePair<string, byte[]>(key, val);
            }
            return kvs;
        }

        /***** kv *****/

        public bool exists(byte[] key)
        {
            var resp = request("exists", key);
            _respCode = _string(resp[0]);
            if (_respCode == "not_found")
            {
                return false;
            }
            if (resp.Count != 2)
            {
                throw new Exception("Bad response!");
            }
            return (_string(resp[1]) == "1" ? true : false);
        }

        public bool exists(string key)
        {
            return exists(_bytes(key));
        }

        public void set(byte[] key, byte[] val)
        {
            var resp = request("set", key, val);
            _respCode = _string(resp[0]);
        }

        public void set(string key, string val)
        {
            set(_bytes(key), _bytes(val));
        }

        /// <summary>
        /// </summary>
        /// <param name="key"></param>
        /// <param name="val"></param>
        /// <returns>returns true if name.key is found, otherwise returns false.</returns>
        public bool get(byte[] key, out byte[] val)
        {
            val = null;
            var resp = request("get", key);
            _respCode = _string(resp[0]);
            if (_respCode == "not_found")
            {
                return false;
            }
            if (resp.Count != 2)
            {
                throw new Exception("Bad response!");
            }
            val = resp[1];
            return true;
        }

        public bool get(string key, out byte[] val)
        {
            return get(_bytes(key), out val);
        }

        public bool get(string key, out string val)
        {
            val = null;
            byte[] bs;
            if (!get(key, out bs))
            {
                return false;
            }
            val = _string(bs);
            return true;
        }

        public void del(byte[] key)
        {
            var resp = request("del", key);
            _respCode = _string(resp[0]);
        }

        public void del(string key)
        {
            del(_bytes(key));
        }

        public KeyValuePair<string, byte[]>[] scan(string key_start, string key_end, long limit)
        {
            var resp = request("scan", key_start, key_end, limit.ToString());
            return parse_scan_resp(resp);
        }

        public KeyValuePair<string, byte[]>[] rscan(string key_start, string key_end, long limit)
        {
            var resp = request("rscan", key_start, key_end, limit.ToString());
            return parse_scan_resp(resp);
        }

        /***** hash *****/

        public void hset(byte[] name, byte[] key, byte[] val)
        {
            var resp = request("hset", name, key, val);
            _respCode = _string(resp[0]);
        }

        public void hset(string name, string key, byte[] val)
        {
            hset(_bytes(name), _bytes(key), val);
        }

        public void hset(string name, string key, string val)
        {
            hset(_bytes(name), _bytes(key), _bytes(val));
        }

        /// <summary>
        /// </summary>
        /// <param name="name"></param>
        /// <param name="key"></param>
        /// <param name="val"></param>
        /// <returns>returns true if name.key is found, otherwise returns false.</returns>
        public bool hget(byte[] name, byte[] key, out byte[] val)
        {
            val = null 
                       
                    
                    

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
使用c#特性,给方法或类打自定义标签再反射获取发布时间:2022-07-13
下一篇:
【C#】ConcurrentBagT方法发布时间:2022-07-13
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap