在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Configuration; using System.Net; using System.Threading; using Rocky.Net; namespace Rocky.Test.Unit { internal class FileTransferTest : IUnitTest { public void Run(object arg) { Console.WriteLine("请输入命令:"); string cmd = Console.ReadLine(); string[] cmds = cmd.Split(' '); FileMultiTransfer trans; switch (cmds[0].ToLower()) { // recv: port case "recv": Console.WriteLine("等待接收请求。。。"); trans = new FileMultiTransfer(); trans.ProgressChanged += new EventHandler<FileTransferEventArgs>(transmiter_ProgressChanged); trans.Receive(AppDomain.CurrentDomain.BaseDirectory, new IPEndPoint(IPAddress.Any, int.Parse(cmds[1]))); break; // send: ipEndPoint case "send": Console.WriteLine("等待发送请求。。。"); trans = new FileMultiTransfer(); trans.ProgressChanged += new EventHandler<FileTransferEventArgs>(transmiter_ProgressChanged); trans.Send(new TransferConfig(ConfigurationManager.AppSettings["TransferFilePath"]), SocketHelper.Parse(cmds[1])); break; default: Console.WriteLine("未知命令."); break; } Console.Read(); } /// <summary> /// WCF单线程传输 /// </summary> public void Single() { var t = new FileTransfer(); t.Listen(Runtime.CurrentDirectory, 520); t.ProgressChanged += new EventHandler<FileTransferEventArgs>(transmiter_ProgressChanged); t.Send(@"D:\FastCopy.rar", SocketHelper.Parse("192.168.1.104:520")); } private void transmiter_ProgressChanged(object sender, FileTransferEventArgs e) { Console.WriteLine("Thread:{0};Name={1}\r\nFileMD5:{2};Name={3}\r\n{4}KB/s\t{5}/{6}\t{7}%\r\n", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.Name, e.Token.FileMD5, e.Token.FileName, e.Progress.GetSpeed(), e.Progress.Transferred, e.Progress.TransferLength, e.Progress.ProgressPercentage); } } }
1.关于传输效率问题, //#define Sleep 注意关闭,这是debug用的。 2.关于传输速度显示错误已fixed; 下面贴入核心代码,源码详见https://code.google.com/p/rockylib/中Rocky.Net.FileTransfer /********************************************************************************* ** File Name : FileTransmitor.cs ** Copyright (C) 2010 Snda Network Corporation. All Rights Reserved. ** Creator : RockyWong ** Create Date : 2010-06-02 11:22:45 ** Update Date : 2013-01-11 11:35:26 ** Description : 多线程多管道可断点传送大文件 ** Version No : *********************************************************************************/ //#define Sleep //#undef Sleep using System; using System.Net; using System.Net.Sockets; using System.IO; using System.Threading; using System.Collections.Concurrent; namespace Rocky.Net { public sealed class FileMultiTransfer : FileTransferBase { #region Fields internal const string PointExtension = ".dat"; internal const string TempExtension = ".temp"; #endregion #region Send public void ProxySend(IPEndPoint proxyEndPoint, TransferConfig config) { var client = new AsyncTcpClient(); client.Received += (sender, e) => { try { var reader = e.Session.GetStreamReader(); int availablePort = reader.ReadInt32(); Send(config, new IPEndPoint(proxyEndPoint.Address, availablePort)); } finally { client.Dispose(); } }; var result = client.Connect(proxyEndPoint); result.AsyncWaitHandle.WaitOne(); if (!client.IsConnected) { throw new SocketException((int)SocketError.ConnectionRefused); } } public void Send(TransferConfig config, IPEndPoint remoteEndPoint) { Assert.ThrowArgumentNullException(config); var worker = new SendChunkWorker(remoteEndPoint); byte[] data = Runtime.SerializeBinary(config); Buffer.BlockCopy(data, 0, worker.Buffer, 0, data.Length); worker.Client.Send(worker.Buffer, 0, data.Length, SocketFlags.None); var e = new FileTransferEventArgs(); e.Token = config; e.Progress.Start(config.FileLength); var workers = new SendChunkWorker[config.ThreadCount]; workers[0] = worker; for (int i = 1; i < workers.Length; i++) { workers[i] = new SendChunkWorker(remoteEndPoint); } #region Breakpoint int perPairCount = PerLongSize * 2, count = perPairCount * workers.Length; byte[] bufferInfo = new byte[count]; long oddSize, avgSize = Math.DivRem(config.FileLength, (long)workers.Length, out oddSize); if (worker.Client.Receive(bufferInfo) == 4) { for (int i = 0, j = workers.Length - 1; i < workers.Length; i++) { workers[i].Initialize(config.File.FullName, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize); } } else { long fValue, tValue; for (int i = 0, j = workers.Length - 1; i < workers.Length; i++) { BufferSegment.Read(bufferInfo, i * perPairCount, out fValue); BufferSegment.Read(bufferInfo, i * perPairCount + PerLongSize, out tValue); workers[i].Initialize(config.File.FullName, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue); SocketHelper.Logger.DebugFormat("[Multi]Remote{0} breakpoint{1}:{2}/{3}.", remoteEndPoint.ToString(), i, fValue, tValue); } } Thread.Sleep(200); #endregion for (int i = 0; i < workers.Length; i++) { workers[i].RunWorker(null); } #region Breakpoint long bytesTransferred = 0L; do { workers.ReportSpeed(e.Progress, ref bytesTransferred); OnProgressChanged(e); Thread.Sleep(1000); } while (!workers.IsAllCompleted()); workers.ReportSpeed(e.Progress, ref bytesTransferred); OnProgressChanged(e); #endregion e.Progress.Stop(); } #endregion #region Receive private string _localSavePath; private ConcurrentQueue<int> _availablePortQueue; private AsyncTcpListener _listener; public void ProxyListen(string localSavePath, int localPort) { ProxyListen(localSavePath, localPort, 50); } public void ProxyListen(string localSavePath, int localPort, int maxClient) { Runtime.CreateDirectory(localSavePath); _localSavePath = localSavePath; _availablePortQueue = new ConcurrentQueue<int>(); int start = 2048, end = start + maxClient; for (; start <= end; start++) { _availablePortQueue.Enqueue(start); } _listener = new AsyncTcpListener(new IPEndPoint(IPAddress.Any, localPort), maxClient, 512); _listener.SessionStart += new NetEventHandler(listener_SessionStart); _listener.Start(); } void listener_SessionStart(object sender, NetEventArgs e) { int availablePort; if (!_availablePortQueue.TryDequeue(out availablePort)) { e.Session.Abandon(); return; } try { var writer = e.Session.GetStreamWriter(); writer.Write(availablePort); var result = _listener.Send(e.Session); result.AsyncWaitHandle.WaitOne(); Receive(_localSavePath, new IPEndPoint(IPAddress.Any, availablePort)); } finally { _availablePortQueue.Enqueue(availablePort); } } public void Receive(string localSavePath, IPEndPoint localEndPoint) { Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); listener.Bind(localEndPoint); //最多支持16线程 listener.Listen(16); ReceiveChunkWorker worker = new ReceiveChunkWorker(listener.Accept()); int recv = worker.Client.Receive(worker.Buffer); byte[] data = new byte[recv]; Buffer.BlockCopy(worker.Buffer, 0, data, 0, recv); TransferConfig config = (TransferConfig)Runtime.DeserializeBinary(data); var e = new FileTransferEventArgs(); e.Token = config; e.Progress.Start(config.FileLength); ReceiveChunkWorker[] workers = new ReceiveChunkWorker[config.ThreadCount]; workers[0] = worker; for (int i = 1; i < workers.Length; i++) { workers[i] = new ReceiveChunkWorker(listener.Accept()); } #region Breakpoint int perPairCount = PerLongSize * 2, count = perPairCount * workers.Length; byte[] bufferInfo = new byte[count]; string filePath = Path.Combine(localSavePath, config.FileMD5 + Path.GetExtension(config.FileName)), pointFilePath = Path.ChangeExtension(filePath, PointExtension), tempFilePath = Path.ChangeExtension(filePath, TempExtension); FileStream pointStream; long oddSize, avgSize = Math.DivRem(config.FileLength, (long)workers.Length, out oddSize); if (File.Exists(pointFilePath) && File.Exists(tempFilePath)) { pointStream = new FileStream(pointFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None); pointStream.Read(bufferInfo, 0, count); long fValue, tValue; for (int i = 0, j = workers.Length - 1; i < workers.Length; i++) { BufferSegment.Read(bufferInfo, i * perPairCount, out fValue); BufferSegment.Read(bufferInfo, i * perPairCount + PerLongSize, out tValue); workers[i].Initialize(tempFilePath, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue); SocketHelper.Logger.DebugFormat("[Multi]Local{0} breakpoint read{1}:{2}/{3}.", localEndPoint.ToString(), i, fValue, tValue); } worker.Client.Send(bufferInfo); } else { pointStream = new FileStream(pointFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None); FileStream stream = new FileStream(tempFilePath, FileMode.CreateNew, FileAccess.Write, FileShare.Write); stream.SetLength(config.FileLength); stream.Flush(); stream.Dispose(); for (int i = 0, j = workers.Length - 1; i < workers.Length; i++) { workers[i].Initialize(tempFilePath, config.ChunkSize, i * avgSize, i == j ? avgSize + oddSize : avgSize); } worker.Client.Send(bufferInfo, 0, 4, SocketFlags.None); } Timer timer = new Timer(state => { long fValue, tValue; for (int i = 0; i < workers.Length; i++) { workers[i].ReportProgress(out fValue, out tValue); BufferSegment.Write(bufferInfo, i * perPairCount, fValue); BufferSegment.Write(bufferInfo, i * perPairCount + PerLongSize, tValue); SocketHelper.Logger.DebugFormat("[Multi]Local{0} breakpoint write{1}:{2}/{3}.", localEndPoint.ToString(), i, fValue, tValue); } pointStream.Position = 0L; pointStream.Write(bufferInfo, 0, count); pointStream.Flush(); }, null, TimeSpan.Zero, TimeSpan.FromSeconds(2)); #endregion AutoResetEvent reset = new AutoResetEvent(false); for (int i = 0; i < workers.Length; i++) { workers[i].RunWorker(i == workers.Length - 1 ? reset : null); } reset.WaitOne(); #region Breakpoint long bytesTransferred = 0L; do { workers.ReportSpeed(e.Progress, ref bytesTransferred); OnProgressChanged(e); Thread.Sleep(1000); } while (!workers.IsAllCompleted()); workers.ReportSpeed(e.Progress, ref bytesTransferred); OnProgressChanged(e); timer.Dispose(); pointStream.Dispose(); File.Delete(pointFilePath); File.Move(tempFilePath, filePath); #endregion e.Progress.Stop(); } #endregion } #region IChunkWorker internal class SendChunkWorker : IChunkWorker { private long _bytesSent, _sendLength; private byte[] _buffer; private Socket _sock; private FileStream _reader; private Thread _thread; private bool _isCompleted; public byte[] Buffer { get { return _buffer; } } public Socket Client { get { return _sock; } } public bool IsCompleted { get { return _isCompleted; } } public SendChunkWorker(IPEndPoint ipe) { _sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _sock.Connect(ipe); _buffer = BufferSegment.GetBuffer(BufferSize.Socket); } public void Initialize(string filePath, int chunkSize, long position, long length) { Initialize(filePath, chunkSize, position, length, 0L, length); } public void Initialize(string filePath, int chunkSize, long position, long length, long bytesTransferred, long transferLength) { _reader = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, Math.Min(TransferConfig.MaxChunkBufferSize, chunkSize)); _reader.Position = position + bytesTransferred; _bytesSent = bytesTransferred; _sendLength = transferLength; _thread = new Thread(StartWork); _thread.Name = string.Format("[M{0}/{1}]", _bytesSent, _sendLength); _thread.IsBackground = true; SocketHelper.Logger.DebugFormat("[Multi]Remote{0} {1} Initialized.", _sock.RemoteEndPoint, _thread.Name); } private void StartWork(object state) { try { int read, sent; bool flag; while (_bytesSent < _sendLength) { read = _reader.Read(_buffer, 0, Math.Min((int)BufferSize.Socket, (int)(_sendLength - _bytesSent))); sent = 0; flag = true; while ((sent += _sock.Send(_buffer, sent, read, SocketFlags.None)) < read) { flag = false; _bytesSent += (long)sent; SocketHelper.Logger.DebugFormat("[Multi]Remote{0} {1}:{2}/{3}.", _sock.RemoteEndPoint, _thread.Name, _bytesSent, _sendLength); #if Sleep Thread.Sleep(1); #endif } if (flag) { _bytesSent += (long)read; SocketHelper.Logger.DebugFormat("[Multi]Remote{0} {1}:{2}/{3}.", _sock.RemoteEndPoint, _thread.Name, _bytesSent, _sendLength); #if Sleep Thread.Sleep(1); #endif } } } catch (SocketException ex) { if (ex.ErrorCode == 10054) { return; } throw; } finally { _reader.Dispose(); _sock.Shutdown(SocketShutdown.Both); _sock.Close(); } var waitHandle = state as EventWaitHandle; if (waitHandle != null) { waitHandle.Set(); } _isCompleted = true; } public void ReportProgress(out long bytesTransferred, out long transferLength) { bytesTransferred = _bytesSent; transferLength = _sendLength; } public void RunWorker(EventWaitHandle waitHandle) { _thread.Start(waitHandle); } } internal class ReceiveChunkWorker : IChunkWorker { private long _offset, _bytesReceived, _receiveLength; private byte[] _buffer; private Socket _sock; private FileStream _writer; private Thread _thread; private bool _isCompleted; public byte[] Buffer { get { return _buffer; } } public Socket Client { get { return _sock; } } public bool IsCompleted { get { return _isCompleted; } } public ReceiveChunkWorker(Socket client) { _sock = client; _buffer = BufferSegment.GetBuffer(BufferSize.Socket); } public void Initialize(string filePath, int chunkSize, long position, long length) { Initialize(filePath, chunkSize, position, length, 0L, length); } public void Initialize(string filePath, int chunkSize, long position, long length, long bytesTransferred, long transferLength) { _writer = new FileStream(filePath, FileMode.Open, FileAccess.Write, FileShare.Write, Math.Min(TransferConfig.MaxChunkBufferSize, chunkSize)); _writer.Position = position + bytesTransferred; _writer.Lock(position, length); _offset = position; _bytesReceived = bytesTransferred; _receiveLength = transferLength; _thread = new Thread(StartWork); _thread.Name = string.Format("[M{0}/{1}]", _bytesReceived, _receiveLength); _thread.IsBackground = true; SocketHelper.Logger.DebugFormat("[Multi]Local{0} {1} Initialized.", _sock.LocalEndPoint, _thread.Name); } private void StartWork(object state) { try { int received; while (_bytesReceived < _receiveLength) { if ((received = _sock.Receive(_buffer)) == 0) { break; } _writer.Write(_buffer, 0, received); _writer.Flush(); _bytesReceived += (long)received; SocketHelper.Logger.DebugFormat("[Multi]Local{0} {1}:{2}/{3}.", _sock.LocalEndPoint, _thread.Name, _bytesReceived, _receiveLength); #if Sleep Thread.Sleep(1); #endif } } catch (SocketException ex) { if (ex.ErrorCode == 10054) { return; } throw; } finally { _writer.Unlock(_offset, _receiveLength); _writer.Dispose(); _sock.Close(); } var waitHandle = state as EventWaitHandle; if (waitHandle != null) { waitHandle.Set(); } _isCompleted = true; } public void ReportProgress(out long bytesTransferred, out long transferLength) { bytesTransferred = _bytesReceived; transferLength = _receiveLength; } public void RunWorker(EventWaitHandle waitHandle) { _thread.Start(waitHandle); } } internal interface IChunkWorker { bool IsCompleted { get; } void Initialize(string filePath, int chunkSize, long position, long length); void Initialize(string filePath, int chunkSize, long position, long length, long bytesTransferred, long transferLength); void ReportProgress(out long bytesTransferred, out long transferLength); void RunWorker(EventWaitHandle waitHandle); } #endregion #region Extensions internal static class Extensions { public static bool IsAllCompleted(this IChunkWorker[] workers) { bool flag = true; for (int i = 0; i < workers.Length; i++) { if (!workers[i].IsCompleted) { flag = false; break; } } return flag; } public static void ReportSpeed(this IChunkWorker[] workers, TransferProgress progress, ref long lastTransferred) { long bytesTransferred = 0L, transferLength = 0L; for (int i = 0; i < workers.Length; i++) { long perBytesTransferred, perTransferLength; workers[i].ReportProgress(out perBytesTransferred, out perTransferLength); bytesTransferred += perBytesTransferred; transferLength += perTransferLength; } progress.ReportChange(bytesTransferred - lastTransferred, bytesTransferred); lastTransferred = bytesTransferred; } } #endregion } 调用: 上篇文章:http://www.cnblogs.com/Googler/archive/2010/06/03/1750616.html |
请发表评论