• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

C#SOCKET多线程多管道可断点传送大文件2(续)

原作者: [db:作者] 来自: [db:来源] 收藏 邀请
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Configuration;
using System.Net;
using System.Threading;
using Rocky.Net;

namespace Rocky.Test.Unit
{
    internal class FileTransferTest : IUnitTest
    {
        public void Run(object arg)
        {
            Console.WriteLine("请输入命令:");
            string cmd = Console.ReadLine();
            string[] cmds = cmd.Split(' ');
            FileMultiTransfer trans;
            switch (cmds[0].ToLower())
            {
                // recv: port
                case "recv":
                    Console.WriteLine("等待接收请求。。。");
                    trans = new FileMultiTransfer();
                    trans.ProgressChanged += new EventHandler<FileTransferEventArgs>(transmiter_ProgressChanged);
                    trans.Receive(AppDomain.CurrentDomain.BaseDirectory, new IPEndPoint(IPAddress.Any, int.Parse(cmds[1])));
                    break;
                // send: ipEndPoint
                case "send":
                    Console.WriteLine("等待发送请求。。。");
                    trans = new FileMultiTransfer();
                    trans.ProgressChanged += new EventHandler<FileTransferEventArgs>(transmiter_ProgressChanged);
                    trans.Send(new TransferConfig(ConfigurationManager.AppSettings["TransferFilePath"]), SocketHelper.Parse(cmds[1]));
                    break;
                default:
                    Console.WriteLine("未知命令.");
                    break;
            }
            Console.Read();
        }

        /// <summary>
        /// WCF单线程传输
        /// </summary>
        public void Single()
        {
            var t = new FileTransfer();
            t.Listen(Runtime.CurrentDirectory, 520);
            t.ProgressChanged += new EventHandler<FileTransferEventArgs>(transmiter_ProgressChanged);
            t.Send(@"D:\FastCopy.rar", SocketHelper.Parse("192.168.1.104:520"));
        }

        private void transmiter_ProgressChanged(object sender, FileTransferEventArgs e)
        {
            Console.WriteLine("Thread:{0};Name={1}\r\nFileMD5:{2};Name={3}\r\n{4}KB/s\t{5}/{6}\t{7}%\r\n",
                Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.Name,
                e.Token.FileMD5, e.Token.FileName,
                e.Progress.GetSpeed(),
                e.Progress.Transferred, e.Progress.TransferLength, e.Progress.ProgressPercentage);
        }
    }
}

 

1.关于传输效率问题,
代码namespace上部分

//#define Sleep

注意关闭,这是debug用的。

2.关于传输速度显示错误已fixed;

下面贴入核心代码,源码详见https://code.google.com/p/rockylib/中Rocky.Net.FileTransfer

/*********************************************************************************
** File Name    :    FileTransmitor.cs
** Copyright (C) 2010 Snda Network Corporation. All Rights Reserved.
** Creator        :    RockyWong
** Create Date    :    2010-06-02 11:22:45
** Update Date    :    2013-01-11 11:35:26
** Description    :    多线程多管道可断点传送大文件
** Version No    :    
*********************************************************************************/
//#define Sleep
//#undef Sleep
using System;
using System.Net;
using System.Net.Sockets;
using System.IO;
using System.Threading;
using System.Collections.Concurrent;

namespace Rocky.Net
{
    public sealed class FileMultiTransfer : FileTransferBase
    {
        #region Fields
        internal const string PointExtension = ".dat";
        internal const string TempExtension = ".temp";
        #endregion

        #region Send
        public void ProxySend(IPEndPoint proxyEndPoint, TransferConfig config)
        {
            var client = new AsyncTcpClient();
            client.Received += (sender, e) =>
            {
                try
                {
                    var reader = e.Session.GetStreamReader();
                    int availablePort = reader.ReadInt32();
                    Send(config, new IPEndPoint(proxyEndPoint.Address, availablePort));
                }
                finally
                {
                    client.Dispose();
                }
            };
            var result = client.Connect(proxyEndPoint);
            result.AsyncWaitHandle.WaitOne();
            if (!client.IsConnected)
            {
                throw new SocketException((int)SocketError.ConnectionRefused);
            }
        }

        public void Send(TransferConfig config, IPEndPoint remoteEndPoint)
        {
            Assert.ThrowArgumentNullException(config);
            var worker = new SendChunkWorker(remoteEndPoint);
            byte[] data = Runtime.SerializeBinary(config);
            Buffer.BlockCopy(data, 0, worker.Buffer, 0, data.Length);
            worker.Client.Send(worker.Buffer, 0, data.Length, SocketFlags.None);
            var e = new FileTransferEventArgs();
            e.Token = config;
            e.Progress.Start(config.FileLength);
            var workers = new SendChunkWorker[config.ThreadCount];
            workers[0] = worker;
            for (int i = 1; i < workers.Length; i++)
            {
                workers[i] = new SendChunkWorker(remoteEndPoint);
            }
            #region Breakpoint
            int perPairCount = PerLongSize * 2, count = perPairCount * workers.Length;
            byte[] bufferInfo = new byte[count];
            long oddSize, avgSize = Math.DivRem(config.FileLength, (long)workers.Length, out oddSize);
            if (worker.Client.Receive(bufferInfo) == 4)
            {
                for (int i = 0, j = workers.Length - 1; i < workers.Length; i++)
                {
                    workers[i].Initialize(config.File.FullName, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize);
                }
            }
            else
            {
                long fValue, tValue;
                for (int i = 0, j = workers.Length - 1; i < workers.Length; i++)
                {
                    BufferSegment.Read(bufferInfo, i * perPairCount, out fValue);
                    BufferSegment.Read(bufferInfo, i * perPairCount + PerLongSize, out tValue);
                    workers[i].Initialize(config.File.FullName, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
                    SocketHelper.Logger.DebugFormat("[Multi]Remote{0} breakpoint{1}:{2}/{3}.", remoteEndPoint.ToString(), i, fValue, tValue);
                }
            }
            Thread.Sleep(200);
            #endregion
            for (int i = 0; i < workers.Length; i++)
            {
                workers[i].RunWorker(null);
            }
            #region Breakpoint
            long bytesTransferred = 0L;
            do
            {
                workers.ReportSpeed(e.Progress, ref bytesTransferred);
                OnProgressChanged(e);
                Thread.Sleep(1000);
            }
            while (!workers.IsAllCompleted());
            workers.ReportSpeed(e.Progress, ref bytesTransferred);
            OnProgressChanged(e);
            #endregion
            e.Progress.Stop();
        }
        #endregion

        #region Receive
        private string _localSavePath;
        private ConcurrentQueue<int> _availablePortQueue;
        private AsyncTcpListener _listener;

        public void ProxyListen(string localSavePath, int localPort)
        {
            ProxyListen(localSavePath, localPort, 50);
        }
        public void ProxyListen(string localSavePath, int localPort, int maxClient)
        {
            Runtime.CreateDirectory(localSavePath);
            _localSavePath = localSavePath;
            _availablePortQueue = new ConcurrentQueue<int>();
            int start = 2048, end = start + maxClient;
            for (; start <= end; start++)
            {
                _availablePortQueue.Enqueue(start);
            }
            _listener = new AsyncTcpListener(new IPEndPoint(IPAddress.Any, localPort), maxClient, 512);
            _listener.SessionStart += new NetEventHandler(listener_SessionStart);
            _listener.Start();
        }
        void listener_SessionStart(object sender, NetEventArgs e)
        {
            int availablePort;
            if (!_availablePortQueue.TryDequeue(out availablePort))
            {
                e.Session.Abandon();
                return;
            }
            try
            {
                var writer = e.Session.GetStreamWriter();
                writer.Write(availablePort);
                var result = _listener.Send(e.Session);
                result.AsyncWaitHandle.WaitOne();
                Receive(_localSavePath, new IPEndPoint(IPAddress.Any, availablePort));
            }
            finally
            {
                _availablePortQueue.Enqueue(availablePort);
            }
        }

        public void Receive(string localSavePath, IPEndPoint localEndPoint)
        {
            Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            listener.Bind(localEndPoint);
            //最多支持16线程
            listener.Listen(16);
            ReceiveChunkWorker worker = new ReceiveChunkWorker(listener.Accept());
            int recv = worker.Client.Receive(worker.Buffer);
            byte[] data = new byte[recv];
            Buffer.BlockCopy(worker.Buffer, 0, data, 0, recv);
            TransferConfig config = (TransferConfig)Runtime.DeserializeBinary(data);
            var e = new FileTransferEventArgs();
            e.Token = config;
            e.Progress.Start(config.FileLength);
            ReceiveChunkWorker[] workers = new ReceiveChunkWorker[config.ThreadCount];
            workers[0] = worker;
            for (int i = 1; i < workers.Length; i++)
            {
                workers[i] = new ReceiveChunkWorker(listener.Accept());
            }
            #region Breakpoint
            int perPairCount = PerLongSize * 2, count = perPairCount * workers.Length;
            byte[] bufferInfo = new byte[count];
            string filePath = Path.Combine(localSavePath, config.FileMD5 + Path.GetExtension(config.FileName)),
                pointFilePath = Path.ChangeExtension(filePath, PointExtension), tempFilePath = Path.ChangeExtension(filePath, TempExtension);
            FileStream pointStream;
            long oddSize, avgSize = Math.DivRem(config.FileLength, (long)workers.Length, out oddSize);
            if (File.Exists(pointFilePath) && File.Exists(tempFilePath))
            {
                pointStream = new FileStream(pointFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None);
                pointStream.Read(bufferInfo, 0, count);
                long fValue, tValue;
                for (int i = 0, j = workers.Length - 1; i < workers.Length; i++)
                {
                    BufferSegment.Read(bufferInfo, i * perPairCount, out fValue);
                    BufferSegment.Read(bufferInfo, i * perPairCount + PerLongSize, out tValue);
                    workers[i].Initialize(tempFilePath, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
                    SocketHelper.Logger.DebugFormat("[Multi]Local{0} breakpoint read{1}:{2}/{3}.", localEndPoint.ToString(), i, fValue, tValue);
                }
                worker.Client.Send(bufferInfo);
            }
            else
            {
                pointStream = new FileStream(pointFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None);
                FileStream stream = new FileStream(tempFilePath, FileMode.CreateNew, FileAccess.Write, FileShare.Write);
                stream.SetLength(config.FileLength);
                stream.Flush();
                stream.Dispose();
                for (int i = 0, j = workers.Length - 1; i < workers.Length; i++)
                {
                    workers[i].Initialize(tempFilePath, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize);

                }
                worker.Client.Send(bufferInfo, 0, 4, SocketFlags.None);
            }
            Timer timer = new Timer(state =>
            {
                long fValue, tValue;
                for (int i = 0; i < workers.Length; i++)
                {
                    workers[i].ReportProgress(out fValue, out tValue);
                    BufferSegment.Write(bufferInfo, i * perPairCount, fValue);
                    BufferSegment.Write(bufferInfo, i * perPairCount + PerLongSize, tValue);
                    SocketHelper.Logger.DebugFormat("[Multi]Local{0} breakpoint write{1}:{2}/{3}.", localEndPoint.ToString(), i, fValue, tValue);
                }
                pointStream.Position = 0L;
                pointStream.Write(bufferInfo, 0, count);
                pointStream.Flush();

            }, null, TimeSpan.Zero, TimeSpan.FromSeconds(2));
            #endregion
            AutoResetEvent reset = new AutoResetEvent(false);
            for (int i = 0; i < workers.Length; i++)
            {
                workers[i].RunWorker(i == workers.Length - 1 ? reset : null);
            }
            reset.WaitOne();
            #region Breakpoint
            long bytesTransferred = 0L;
            do
            {
                workers.ReportSpeed(e.Progress, ref bytesTransferred);
                OnProgressChanged(e);
                Thread.Sleep(1000);
            }
            while (!workers.IsAllCompleted());
            workers.ReportSpeed(e.Progress, ref bytesTransferred);
            OnProgressChanged(e);
            timer.Dispose();
            pointStream.Dispose();
            File.Delete(pointFilePath);
            File.Move(tempFilePath, filePath);
            #endregion
            e.Progress.Stop();
        }
        #endregion
    }

    #region IChunkWorker
    internal class SendChunkWorker : IChunkWorker
    {
        private long _bytesSent, _sendLength;
        private byte[] _buffer;
        private Socket _sock;
        private FileStream _reader;
        private Thread _thread;
        private bool _isCompleted;

        public byte[] Buffer
        {
            get { return _buffer; }
        }
        public Socket Client
        {
            get { return _sock; }
        }
        public bool IsCompleted
        {
            get { return _isCompleted; }
        }

        public SendChunkWorker(IPEndPoint ipe)
        {
            _sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            _sock.Connect(ipe);
            _buffer = BufferSegment.GetBuffer(BufferSize.Socket);
        }

        public void Initialize(string filePath, int chunkSize, long position, long length)
        {
            Initialize(filePath, chunkSize, position, length, 0L, length);
        }
        public void Initialize(string filePath, int chunkSize, long position, long length, long bytesTransferred, long transferLength)
        {
            _reader = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, Math.Min(TransferConfig.MaxChunkBufferSize, chunkSize));
            _reader.Position = position + bytesTransferred;
            _bytesSent = bytesTransferred;
            _sendLength = transferLength;
            _thread = new Thread(StartWork);
            _thread.Name = string.Format("[M{0}/{1}]", _bytesSent, _sendLength);
            _thread.IsBackground = true;
            SocketHelper.Logger.DebugFormat("[Multi]Remote{0} {1} Initialized.", _sock.RemoteEndPoint, _thread.Name);
        }
        private void StartWork(object state)
        {
            try
            {
                int read, sent;
                bool flag;
                while (_bytesSent < _sendLength)
                {
                    read = _reader.Read(_buffer, 0, Math.Min((int)BufferSize.Socket, (int)(_sendLength - _bytesSent)));
                    sent = 0;
                    flag = true;
                    while ((sent += _sock.Send(_buffer, sent, read, SocketFlags.None)) < read)
                    {
                        flag = false;
                        _bytesSent += (long)sent;
                        SocketHelper.Logger.DebugFormat("[Multi]Remote{0} {1}:{2}/{3}.", _sock.RemoteEndPoint, _thread.Name, _bytesSent, _sendLength);
#if Sleep
                        Thread.Sleep(1);
#endif
                    }
                    if (flag)
                    {
                        _bytesSent += (long)read;
                        SocketHelper.Logger.DebugFormat("[Multi]Remote{0} {1}:{2}/{3}.", _sock.RemoteEndPoint, _thread.Name, _bytesSent, _sendLength);
#if Sleep
                        Thread.Sleep(1);
#endif
                    }
                }
            }
            catch (SocketException ex)
            {
                if (ex.ErrorCode == 10054)
                {
                    return;
                }
                throw;
            }
            finally
            {
                _reader.Dispose();
                _sock.Shutdown(SocketShutdown.Both);
                _sock.Close();
            }
            var waitHandle = state as EventWaitHandle;
            if (waitHandle != null)
            {
                waitHandle.Set();
            }
            _isCompleted = true;
        }

        public void ReportProgress(out long bytesTransferred, out long transferLength)
        {
            bytesTransferred = _bytesSent;
            transferLength = _sendLength;
        }

        public void RunWorker(EventWaitHandle waitHandle)
        {
            _thread.Start(waitHandle);
        }
    }

    internal class ReceiveChunkWorker : IChunkWorker
    {
        private long _offset, _bytesReceived, _receiveLength;
        private byte[] _buffer;
        private Socket _sock;
        private FileStream _writer;
        private Thread _thread;
        private bool _isCompleted;

        public byte[] Buffer
        {
            get { return _buffer; }
        }
        public Socket Client
        {
            get { return _sock; }
        }
        public bool IsCompleted
        {
            get { return _isCompleted; }
        }

        public ReceiveChunkWorker(Socket client)
        {
            _sock = client;
            _buffer = BufferSegment.GetBuffer(BufferSize.Socket);
        }
        public void Initialize(string filePath, int chunkSize, long position, long length)
        {
            Initialize(filePath, chunkSize, position, length, 0L, length);
        }
        public void Initialize(string filePath, int chunkSize, long position, long length, long bytesTransferred, long transferLength)
        {
            _writer = new FileStream(filePath, FileMode.Open, FileAccess.Write, FileShare.Write, Math.Min(TransferConfig.MaxChunkBufferSize, chunkSize));
            _writer.Position = position + bytesTransferred;
            _writer.Lock(position, length);
            _offset = position;
            _bytesReceived = bytesTransferred;
            _receiveLength = transferLength;
            _thread = new Thread(StartWork);
            _thread.Name = string.Format("[M{0}/{1}]", _bytesReceived, _receiveLength);
            _thread.IsBackground = true;
            SocketHelper.Logger.DebugFormat("[Multi]Local{0} {1} Initialized.", _sock.LocalEndPoint, _thread.Name);
        }
        private void StartWork(object state)
        {
            try
            {
                int received;
                while (_bytesReceived < _receiveLength)
                {
                    if ((received = _sock.Receive(_buffer)) == 0)
                    {
                        break;
                    }
                    _writer.Write(_buffer, 0, received);
                    _writer.Flush();
                    _bytesReceived += (long)received;
                    SocketHelper.Logger.DebugFormat("[Multi]Local{0} {1}:{2}/{3}.", _sock.LocalEndPoint, _thread.Name, _bytesReceived, _receiveLength);
#if Sleep
                    Thread.Sleep(1);
#endif
                }
            }
            catch (SocketException ex)
            {
                if (ex.ErrorCode == 10054)
                {
                    return;
                }
                throw;
            }
            finally
            {
                _writer.Unlock(_offset, _receiveLength);
                _writer.Dispose();
                _sock.Close();
            }
            var waitHandle = state as EventWaitHandle;
            if (waitHandle != null)
            {
                waitHandle.Set();
            }
            _isCompleted = true;
        }

        public void ReportProgress(out long bytesTransferred, out long transferLength)
        {
            bytesTransferred = _bytesReceived;
            transferLength = _receiveLength;
        }

        public void RunWorker(EventWaitHandle waitHandle)
        {
            _thread.Start(waitHandle);
        }
    }

    internal interface IChunkWorker
    {
        bool IsCompleted { get; }
        void Initialize(string filePath, int chunkSize, long position, long length);
        void Initialize(string filePath, int chunkSize, long position, long length, long bytesTransferred, long transferLength);
        void ReportProgress(out long bytesTransferred, out long transferLength);
        void RunWorker(EventWaitHandle waitHandle);
    }
    #endregion

    #region Extensions
    internal static class Extensions
    {
        public static bool IsAllCompleted(this IChunkWorker[] workers)
        {
            bool flag = true;
            for (int i = 0; i < workers.Length; i++)
            {
                if (!workers[i].IsCompleted)
                {
                    flag = false;
                    break;
                }
            }
            return flag;
        }

        public static void ReportSpeed(this IChunkWorker[] workers, TransferProgress progress, ref long lastTransferred)
        {
            long bytesTransferred = 0L, transferLength = 0L;
            for (int i = 0; i < workers.Length; i++)
            {
                long perBytesTransferred, perTransferLength;
                workers[i].ReportProgress(out perBytesTransferred, out perTransferLength);
                bytesTransferred += perBytesTransferred;
                transferLength += perTransferLength;
            }
            progress.ReportChange(bytesTransferred - lastTransferred, bytesTransferred);
            lastTransferred = bytesTransferred;
        }
    }
    #endregion
}

调用:

上篇文章:http://www.cnblogs.com/Googler/archive/2010/06/03/1750616.html


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap