You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

325 lines
8.6 KiB

4 years ago
  1. // Copyright ?2004, 2010, Oracle and/or its affiliates. All rights reserved.
  2. //
  3. // MySQL Connector/NET is licensed under the terms of the GPLv2
  4. // <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>, like most
  5. // MySQL Connectors. There are special exceptions to the terms and
  6. // conditions of the GPLv2 as it is applied to this software, see the
  7. // FLOSS License Exception
  8. // <http://www.mysql.com/about/legal/licensing/foss-exception.html>.
  9. //
  10. // This program is free software; you can redistribute it and/or modify
  11. // it under the terms of the GNU General Public License as published
  12. // by the Free Software Foundation; version 2 of the License.
  13. //
  14. // This program is distributed in the hope that it will be useful, but
  15. // WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
  16. // or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
  17. // for more details.
  18. //
  19. // You should have received a copy of the GNU General Public License along
  20. // with this program; if not, write to the Free Software Foundation, Inc.,
  21. // 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  22. #if MYSQL_6_9
  23. using System;
  24. using System.IO;
  25. using zlib;
  26. using Externals.MySql.Data.MySqlClient.Properties;
  27. using Externals.MySql.Data.Common;
  28. namespace Externals.MySql.Data.MySqlClient
  29. {
  30. /// <summary>
  31. /// Summary description for CompressedStream.
  32. /// </summary>
  33. internal class CompressedStream : Stream
  34. {
  35. // writing fields
  36. private Stream baseStream;
  37. private MemoryStream cache;
  38. // reading fields
  39. private byte[] localByte;
  40. private byte[] inBuffer;
  41. private byte[] lengthBytes;
  42. private WeakReference inBufferRef;
  43. private int inPos;
  44. private int maxInPos;
  45. private ZInputStream zInStream;
  46. public CompressedStream(Stream baseStream)
  47. {
  48. this.baseStream = baseStream;
  49. localByte = new byte[1];
  50. lengthBytes = new byte[7];
  51. cache = new MemoryStream();
  52. inBufferRef = new WeakReference(inBuffer, false);
  53. }
  54. #region Properties
  55. public override bool CanRead
  56. {
  57. get { return baseStream.CanRead; }
  58. }
  59. public override bool CanWrite
  60. {
  61. get { return baseStream.CanWrite; }
  62. }
  63. public override bool CanSeek
  64. {
  65. get { return baseStream.CanSeek; }
  66. }
  67. public override long Length
  68. {
  69. get { return baseStream.Length; }
  70. }
  71. public override long Position
  72. {
  73. get { return baseStream.Position; }
  74. set { baseStream.Position = value; }
  75. }
  76. #endregion
  77. public override void Close()
  78. {
  79. base.Close();
  80. baseStream.Close();
  81. cache.Dispose();
  82. }
  83. public override void SetLength(long value)
  84. {
  85. throw new NotSupportedException(Resources.CSNoSetLength);
  86. }
  87. public override int ReadByte()
  88. {
  89. try
  90. {
  91. Read(localByte, 0, 1);
  92. return localByte[0];
  93. }
  94. catch (EndOfStreamException)
  95. {
  96. return -1;
  97. }
  98. }
  99. public override bool CanTimeout
  100. {
  101. get
  102. {
  103. return baseStream.CanTimeout;
  104. }
  105. }
  106. public override int ReadTimeout
  107. {
  108. get
  109. {
  110. return baseStream.ReadTimeout;
  111. }
  112. set
  113. {
  114. baseStream.ReadTimeout = value;
  115. }
  116. }
  117. public override int WriteTimeout
  118. {
  119. get
  120. {
  121. return baseStream.WriteTimeout;
  122. }
  123. set
  124. {
  125. baseStream.WriteTimeout = value;
  126. }
  127. }
  128. public override int Read(byte[] buffer, int offset, int count)
  129. {
  130. if (buffer == null)
  131. throw new ArgumentNullException("buffer", Resources.BufferCannotBeNull);
  132. if (offset < 0 || offset >= buffer.Length)
  133. throw new ArgumentOutOfRangeException("offset", Resources.OffsetMustBeValid);
  134. if ((offset + count) > buffer.Length)
  135. throw new ArgumentException(Resources.BufferNotLargeEnough, "buffer");
  136. if (inPos == maxInPos)
  137. PrepareNextPacket();
  138. int countToRead = Math.Min(count, maxInPos - inPos);
  139. int countRead;
  140. if (zInStream != null)
  141. countRead = zInStream.read(buffer, offset, countToRead);
  142. else
  143. countRead = baseStream.Read(buffer, offset, countToRead);
  144. inPos += countRead;
  145. // release the weak reference
  146. if (inPos == maxInPos)
  147. {
  148. zInStream = null;
  149. if (!Platform.IsMono())
  150. {
  151. inBufferRef = new WeakReference(inBuffer, false);
  152. inBuffer = null;
  153. }
  154. }
  155. return countRead;
  156. }
  157. private void PrepareNextPacket()
  158. {
  159. MySqlStream.ReadFully(baseStream, lengthBytes, 0, 7);
  160. int compressedLength = lengthBytes[0] + (lengthBytes[1] << 8) + (lengthBytes[2] << 16);
  161. // lengthBytes[3] is seq
  162. int unCompressedLength = lengthBytes[4] + (lengthBytes[5] << 8) +
  163. (lengthBytes[6] << 16);
  164. if (unCompressedLength == 0)
  165. {
  166. unCompressedLength = compressedLength;
  167. zInStream = null;
  168. }
  169. else
  170. {
  171. ReadNextPacket(compressedLength);
  172. MemoryStream ms = new MemoryStream(inBuffer);
  173. zInStream = new ZInputStream(ms);
  174. zInStream.maxInput = compressedLength;
  175. }
  176. inPos = 0;
  177. maxInPos = unCompressedLength;
  178. }
  179. private void ReadNextPacket(int len)
  180. {
  181. if (!Platform.IsMono())
  182. inBuffer = inBufferRef.Target as byte[];
  183. if (inBuffer == null || inBuffer.Length < len)
  184. inBuffer = new byte[len];
  185. MySqlStream.ReadFully(baseStream, inBuffer, 0, len);
  186. }
  187. private MemoryStream CompressCache()
  188. {
  189. // small arrays almost never yeild a benefit from compressing
  190. if (cache.Length < 50)
  191. return null;
  192. byte[] cacheBytes = cache.GetBuffer();
  193. MemoryStream compressedBuffer = new MemoryStream();
  194. ZOutputStream zos = new ZOutputStream(compressedBuffer, zlibConst.Z_DEFAULT_COMPRESSION);
  195. zos.Write(cacheBytes, 0, (int)cache.Length);
  196. zos.finish();
  197. // if the compression hasn't helped, then just return null
  198. if (compressedBuffer.Length >= cache.Length)
  199. return null;
  200. return compressedBuffer;
  201. }
  202. private void CompressAndSendCache()
  203. {
  204. long compressedLength, uncompressedLength;
  205. // we need to save the sequence byte that is written
  206. byte[] cacheBuffer = cache.GetBuffer();
  207. byte seq = cacheBuffer[3];
  208. cacheBuffer[3] = 0;
  209. // first we compress our current cache
  210. MemoryStream compressedBuffer = CompressCache();
  211. // now we set our compressed and uncompressed lengths
  212. // based on if our compression is going to help or not
  213. MemoryStream memStream;
  214. if (compressedBuffer == null)
  215. {
  216. compressedLength = cache.Length;
  217. uncompressedLength = 0;
  218. memStream = cache;
  219. }
  220. else
  221. {
  222. compressedLength = compressedBuffer.Length;
  223. uncompressedLength = cache.Length;
  224. memStream = compressedBuffer;
  225. }
  226. // Make space for length prefix (7 bytes) at the start of output
  227. long dataLength = memStream.Length;
  228. int bytesToWrite = (int)dataLength + 7;
  229. memStream.SetLength(bytesToWrite);
  230. byte[] buffer = memStream.GetBuffer();
  231. Array.Copy(buffer, 0, buffer, 7, (int)dataLength);
  232. // Write length prefix
  233. buffer[0] = (byte)(compressedLength & 0xff);
  234. buffer[1] = (byte)((compressedLength >> 8) & 0xff);
  235. buffer[2] = (byte)((compressedLength >> 16) & 0xff);
  236. buffer[3] = seq;
  237. buffer[4] = (byte)(uncompressedLength & 0xff);
  238. buffer[5] = (byte)((uncompressedLength >> 8) & 0xff);
  239. buffer[6] = (byte)((uncompressedLength >> 16) & 0xff);
  240. baseStream.Write(buffer, 0, bytesToWrite);
  241. baseStream.Flush();
  242. cache.SetLength(0);
  243. if (compressedBuffer != null)
  244. {
  245. compressedBuffer.Dispose();
  246. }
  247. }
  248. public override void Flush()
  249. {
  250. if (!InputDone()) return;
  251. CompressAndSendCache();
  252. }
  253. private bool InputDone()
  254. {
  255. // if we have not done so yet, see if we can calculate how many bytes we are expecting
  256. if ( baseStream is TimedStream && (( TimedStream )baseStream ).IsClosed ) return false;
  257. if (cache.Length < 4) return false;
  258. byte[] buf = cache.GetBuffer();
  259. int expectedLen = buf[0] + (buf[1] << 8) + (buf[2] << 16);
  260. if (cache.Length < (expectedLen + 4)) return false;
  261. return true;
  262. }
  263. public override void WriteByte(byte value)
  264. {
  265. cache.WriteByte(value);
  266. }
  267. public override void Write(byte[] buffer, int offset, int count)
  268. {
  269. cache.Write(buffer, offset, count);
  270. }
  271. public override long Seek(long offset, SeekOrigin origin)
  272. {
  273. return baseStream.Seek(offset, origin);
  274. }
  275. }
  276. }
  277. #endif