You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

746 lines
24 KiB

3 years ago
3 years ago
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using System.IO;
  6. using System.Net.Sockets;
  7. using System.Text;
  8. namespace Apewer.Source
  9. {
  10. /// <summary></summary>
  11. public sealed class Redis : IDisposable
  12. {
  13. /// <summary>最大长度。</summary>
  14. public const int MaxLength = 1073741824;
  15. private const int BufferSize = 1048576;
  16. #region Instance
  17. string _host = null;
  18. string _pass = null;
  19. int _port = 6379;
  20. int _db = 0;
  21. int _timeout = 1000; // 1000 毫秒。
  22. Socket socket;
  23. BufferedStream buffered;
  24. /// <summary>连接指定主机。</summary>
  25. public Redis(string host, int port = 6379, string password = null)
  26. {
  27. _host = string.IsNullOrEmpty(host) ? "localhost" : host;
  28. _port = (port < 1 || port > 65535) ? 6379 : port;
  29. _pass = password;
  30. _timeout = -1;
  31. }
  32. /// <summary>连接 127.0.0.1 的指定端口。</summary>
  33. public Redis(int port, string host = "127.0.0.1", string password = null) : this(host, port, password) { }
  34. /// <summary>连接 127.0.0.1 的 6379 端口。</summary>
  35. public Redis() : this("127.0.0.1", 6379, null) { }
  36. /// <summary>获取或设置文本。</summary>
  37. public string this[string key]
  38. {
  39. get { return GetText(key); }
  40. set { SetText(key, value); }
  41. }
  42. /// <summary>获取错误信息。</summary>
  43. private Action<string> Error { get; set; }
  44. /// <summary></summary>
  45. ~Redis()
  46. {
  47. Dispose(false);
  48. }
  49. /// <summary></summary>
  50. public void Dispose()
  51. {
  52. Dispose(true);
  53. GC.SuppressFinalize(this);
  54. }
  55. void Dispose(bool disposing)
  56. {
  57. if (disposing)
  58. {
  59. if (socket != null)
  60. {
  61. SendCommand("QUIT");
  62. SendBySuccess();
  63. socket.Close();
  64. socket = null;
  65. }
  66. }
  67. }
  68. /// <summary>连接 Redis 服务。</summary>
  69. public string Connect()
  70. {
  71. try
  72. {
  73. if (socket != null && socket.Connected) return null;
  74. socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  75. socket.NoDelay = true;
  76. socket.SendTimeout = _timeout;
  77. socket.Connect(_host, _port);
  78. if (!socket.Connected)
  79. {
  80. socket.Close();
  81. socket = null;
  82. return "Socket 连接失败。";
  83. }
  84. buffered = new BufferedStream(new NetworkStream(socket), BufferSize);
  85. if (_pass != null) return SendBySuccess("AUTH", _pass);
  86. return null;
  87. }
  88. catch (Exception ex)
  89. {
  90. var error = ex.GetType().Name + ": " + ex.Message;
  91. return error;
  92. }
  93. }
  94. #endregion
  95. #region Static
  96. // byte[] end_data = new byte[] { (byte)'\r', (byte)'\n' };
  97. byte[] CRLF = new byte[] { 13, 10 };
  98. static byte[] ToBytes(string text)
  99. {
  100. if (string.IsNullOrEmpty(text)) return new byte[0];
  101. return Encoding.UTF8.GetBytes(text);
  102. }
  103. static int Length(string text)
  104. {
  105. return ToBytes(text).Length;
  106. }
  107. static string ToText(byte[] bytes)
  108. {
  109. if (bytes != null && bytes.Length > 0)
  110. {
  111. try
  112. {
  113. return Encoding.UTF8.GetString(bytes);
  114. }
  115. catch { }
  116. }
  117. return "";
  118. }
  119. #endregion
  120. #region Common
  121. T OnError<T>(string message, T @return)
  122. {
  123. Error?.Invoke(message);
  124. return @return;
  125. }
  126. object OnError(string message) => OnError<object>(message, null);
  127. string ReadLine()
  128. {
  129. var sb = new StringBuilder();
  130. int c;
  131. while ((c = buffered.ReadByte()) != -1)
  132. {
  133. if (c == '\r') continue;
  134. if (c == '\n') break;
  135. sb.Append((char)c);
  136. }
  137. return sb.ToString();
  138. }
  139. byte[] ReadData()
  140. {
  141. string line = ReadLine();
  142. if (line.Length < 1) return null;
  143. if (line == "$-1") return null;
  144. char flag = line[0];
  145. if (flag == '-') return OnError(line.Substring(1), null as byte[]);
  146. if (flag == '$')
  147. {
  148. int length;
  149. if (!int.TryParse(line.Substring(1), out length)) return null;
  150. var buffer = new byte[length];
  151. int writted = 0;
  152. while (writted < length)
  153. {
  154. int read = buffered.Read(buffer, writted, length - writted);
  155. if (read < 1) return null;
  156. writted += read;
  157. }
  158. if (buffered.ReadByte() != '\r' || buffered.ReadByte() != '\n') return null;
  159. return buffer;
  160. }
  161. return null;
  162. }
  163. bool Send(string head, byte[] data)
  164. {
  165. if (string.IsNullOrEmpty(head)) return false;
  166. if (!string.IsNullOrEmpty(Connect())) return false;
  167. byte[] HeadBytes = ToBytes(head);
  168. try
  169. {
  170. socket.Send(HeadBytes);
  171. if (!head.EndsWith("\r\n")) socket.Send(CRLF);
  172. if (data != null)
  173. {
  174. if (data.Length > 0) socket.Send(data);
  175. socket.Send(CRLF);
  176. }
  177. return true;
  178. }
  179. catch (Exception)
  180. {
  181. // SocketException 已超时。
  182. socket.Close();
  183. socket = null;
  184. return false;
  185. }
  186. }
  187. /// <exception cref="ArgumentException"></exception>
  188. /// <exception cref="ArgumentNullException"></exception>
  189. bool SendCommand(byte[] data, string cmd, params object[] args)
  190. {
  191. if (string.IsNullOrEmpty(cmd)) throw new ArgumentException(nameof(cmd));
  192. var argsLength = (args != null || args is object[]) ? args.Length : 0;
  193. argsLength += 1; // cmd
  194. if (data != null) argsLength += 1; // data
  195. var sb = new StringBuilder();
  196. sb.Append("*");
  197. sb.Append(argsLength.ToString());
  198. sb.Append("\r\n");
  199. sb.Append("$");
  200. sb.Append(cmd.Length);
  201. sb.Append("\r\n");
  202. sb.Append(cmd);
  203. sb.Append("\r\n");
  204. if (args != null)
  205. {
  206. foreach (var arg in args)
  207. {
  208. var line = arg == null ? "" : arg.ToString();
  209. sb.Append("$");
  210. sb.Append(Length(line).ToString());
  211. sb.Append("\r\n");
  212. sb.Append(line);
  213. sb.Append("\r\n");
  214. }
  215. }
  216. if (data != null)
  217. {
  218. sb.Append("$");
  219. sb.Append(data.Length.ToString());
  220. sb.Append("\r\n");
  221. }
  222. return Send(sb.ToString(), data);
  223. }
  224. /// <exception cref="ArgumentException"></exception>
  225. /// <exception cref="ArgumentNullException"></exception>
  226. bool SendCommand(string cmd, params object[] args) => SendCommand(null, cmd, args);
  227. string SendBySuccess(string cmd = null, params object[] args)
  228. {
  229. if (!string.IsNullOrEmpty(cmd))
  230. {
  231. if (!SendCommand(cmd, args)) return "连接 Redis 服务器失败。";
  232. }
  233. // 获取错误信息。
  234. int c = buffered.ReadByte();
  235. if (c == -1) return "没有更多字节。";
  236. string s = ReadLine();
  237. if (c == '-') return s.StartsWith("ERR ") ? s.Substring(4) : s;
  238. return null;
  239. }
  240. long SendByInt(byte[] data, string cmd, params object[] args)
  241. {
  242. if (!SendCommand(data, cmd, args)) return 0;
  243. int c = buffered.ReadByte();
  244. if (c == -1) return 0;
  245. string line = ReadLine();
  246. if (c == '-') return 0;
  247. if (c == ':')
  248. {
  249. if (long.TryParse(line, out long value)) return value;
  250. }
  251. return 0;
  252. }
  253. long SendByInt(string cmd, params object[] args) => SendByInt(null, cmd, args);
  254. string SendByString(string cmd, params object[] args)
  255. {
  256. if (!SendCommand(cmd, args)) return null;
  257. int c = buffered.ReadByte();
  258. if (c == -1) return null;
  259. var line = ReadLine();
  260. if (line.Length > 0)
  261. {
  262. if (line[0] == '-') return OnError(line, line);
  263. if (line[0] == '+') return OnError(line, line);
  264. }
  265. return line;
  266. }
  267. byte[] SendByData(string cmd, params object[] args) => SendCommand(cmd, args) ? ReadData() : null;
  268. /// <summary></summary>
  269. byte[][] SendByDataArray(string cmd, params object[] args)
  270. {
  271. if (!SendCommand(cmd, args)) return null;
  272. int c = buffered.ReadByte();
  273. if (c == -1) return null;
  274. string s = ReadLine();
  275. if (c == '-') return OnError(s, null as byte[][]);
  276. if (c == '*')
  277. {
  278. int count;
  279. if (int.TryParse(s, out count))
  280. {
  281. byte[][] result = new byte[count][];
  282. for (int i = 0; i < count; i++) result[i] = ReadData();
  283. return result;
  284. }
  285. }
  286. return null;
  287. }
  288. /// <summary></summary>
  289. string[] SendByStringArray(string cmd, params object[] args)
  290. {
  291. byte[][] reply = SendByDataArray(cmd, args);
  292. if (reply == null) return null;
  293. string[] keys = new string[reply.Length];
  294. for (int i = 0; i < reply.Length; i++) keys[i] = ToText(reply[i]);
  295. return keys;
  296. }
  297. #endregion
  298. #region DB
  299. /// <summary>获取服务器信息。</summary>
  300. public Dictionary<string, string> Info()
  301. {
  302. byte[] r = SendByData("INFO");
  303. var dict = new Dictionary<string, string>();
  304. foreach (var line in ToText(r).Split('\n'))
  305. {
  306. int p = line.IndexOf(':');
  307. if (p < 0) continue;
  308. dict.Add(line.Substring(0, p), line.Substring(p + 1));
  309. }
  310. return dict;
  311. }
  312. /// <summary>选择数据库,默认为 0。</summary>
  313. /// <returns>错误信息。</returns>
  314. public string SelectDB(int db = 0)
  315. {
  316. _db = db;
  317. return SendBySuccess("SELECT", _db);
  318. }
  319. /// <summary>清空 DB。</summary>
  320. /// <returns>错误信息。</returns>
  321. public string Flush(bool allDB = false)
  322. {
  323. return allDB ? SendBySuccess("FLUSHALL") : SendBySuccess("FLUSHDB");
  324. }
  325. /// <summary>执行同步保存操作,将所有数据的快照以 RDB 文件的形式保存到磁盘上。</summary>
  326. /// <returns>错误信息。</returns>
  327. public string Save(bool background = true)
  328. {
  329. return background ? SendBySuccess("BGSAVE") : SendBySuccess("SAVE");
  330. }
  331. /// <summary>以 UNIX 时间戳格式返回最近一次 Redis 成功将数据保存到磁盘上的时间。</summary>
  332. public DateTime LastSave()
  333. {
  334. const long UnixEpoch = 621355968000000000L;
  335. long t = SendByInt("LASTSAVE");
  336. return new DateTime(UnixEpoch) + TimeSpan.FromSeconds(t);
  337. }
  338. /// <summary>断开所有客户端,并关闭 Redis 服务进程。</summary>
  339. public void Shutdown()
  340. {
  341. SendCommand("SHUTDOWN");
  342. try
  343. {
  344. string s = ReadLine(); // 服务器可能会返回错误。
  345. if (s.Length == 0) return; // throw new ResponseException("Zero length respose");
  346. // throw new ResponseException(s.StartsWith("-ERR ") ? s.Substring(5) : s.Substring(1));
  347. }
  348. catch (IOException)
  349. {
  350. // this is the expected good result
  351. socket.Close();
  352. socket = null;
  353. }
  354. }
  355. #endregion
  356. #region Key
  357. /// <summary>获取 DB 中 Keys 的数量。</summary>
  358. /// <remarks>执行 DBSIZE 命令。</remarks>
  359. public long CountKeys()
  360. {
  361. return SendByInt("DBSIZE");
  362. }
  363. /// <summary>列出 Keys。</summary>
  364. /// <remarks>执行 KEYS 命令。</remarks>
  365. /// <returns>发生错误时返回 NULL 值。</returns>
  366. public string[] ListKeys(string pattern = "*")
  367. {
  368. var p = string.IsNullOrEmpty(pattern) ? "*" : pattern;
  369. return SendByStringArray("KEYS", p);
  370. }
  371. /// <summary>包含指定 Key。</summary>
  372. public bool ContainsKey(string key)
  373. {
  374. if (key == null) return false;
  375. return SendByInt("EXISTS", key) == 1;
  376. }
  377. /// <summary>删除指定 Key。</summary>
  378. public bool Delete(string key)
  379. {
  380. if (key == null) return false; // throw new ArgumentNullException("key");
  381. return SendByInt("DEL", key) == 1;
  382. }
  383. /// <summary>删除指定的多个 Key。不存在的 key 会被忽略。</summary>
  384. /// <returns>删除的 Key 的数量。</returns>
  385. public long Delete(params string[] keys)
  386. {
  387. if (keys == null || keys.Length < 1) return 0;
  388. return SendByInt("DEL", keys);
  389. }
  390. /// <summary>获取 Key 的值类型,正确类型为 none | string | set | list。</summary>
  391. public string TypeOf(string key)
  392. {
  393. if (key == null) return "";
  394. return SendByString("TYPE", key);
  395. }
  396. /// <summary>随机返回一个已有的 Key。</summary>
  397. public string RandomKey()
  398. {
  399. return SendByString("RANDOMKEY");
  400. }
  401. /// <summary>对 Key 重命名。</summary>
  402. /// <remarks>已经存在的 newKey 会被覆盖。</remarks>
  403. public bool Rename(string oldKey, string newKey)
  404. {
  405. // Exceptions
  406. if (string.IsNullOrEmpty(oldKey)) return false;
  407. if (string.IsNullOrEmpty(newKey)) return false;
  408. return SendByString("RENAME", oldKey, newKey)[0] == '+';
  409. }
  410. /// <summary>设置 Key 的过期时间,过期后 Key 会被自动删除。</summary>
  411. /// <remarks>在 Redis 2.1.3 之前的版本中,修改已经设置过生存时间的 Key,将会和删除 Key 有同样的效果。</remarks>
  412. public bool Expire(string key, int seconds)
  413. {
  414. // Exceptions
  415. if (string.IsNullOrEmpty(key)) return false;
  416. if (seconds < 1) return false;
  417. return SendByInt("EXPIRE", key, seconds) == 1L;
  418. }
  419. /// <summary>设置 Key 的过期时间,过期后 Key 会被自动删除。</summary>
  420. public bool ExpireAt(string key, DateTime time)
  421. {
  422. // Exceptions
  423. if (string.IsNullOrEmpty(key)) return false;
  424. var stamp = Convert.ToInt64((time - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds);
  425. return SendByInt("EXPIREAT", key, stamp) == 1L;
  426. }
  427. /// <summary>获取 Key 过期前的剩余时间。</summary>
  428. /// <remarks>秒数。当 Key 不存在时返回 -1 值。</remarks>
  429. public long TimeToLive(string key)
  430. {
  431. // Exceptions
  432. if (string.IsNullOrEmpty(key)) return -1;
  433. return SendByInt("TTL", key);
  434. }
  435. #endregion
  436. #region Bytes 字节数组
  437. /// <summary>获取值。Key 无效时获取 NULL 值。</summary>
  438. public byte[] GetBytes(string key)
  439. {
  440. // Exceptions
  441. if (string.IsNullOrEmpty(key)) return null;
  442. return SendByData("GET", key);
  443. }
  444. /// <summary>获取多个 Key 值。</summary>
  445. /// <returns>参数无效时返回 NULL 值。</returns>
  446. public byte[][] GetMultipleBytes(params string[] keys)
  447. {
  448. // Exceptions
  449. if (keys == null || keys.Length == 0) return null;
  450. return SendByDataArray("MGET", keys);
  451. }
  452. /// <summary>设置字节数组。</summary>
  453. /// <returns>错误信息。</returns>
  454. public string SetBytes(string key, byte[] bytes, bool replace = true)
  455. {
  456. // Exceptions
  457. if (key == null) return "Key 无效。";
  458. if (bytes == null) return "Bytes 无效";
  459. if (bytes.Length > MaxLength) return "Value[] 长度超出限制。";
  460. var cmd = replace ? "SET" : "SETNX";
  461. var success = SendCommand(bytes, cmd, key);
  462. if (!success) return "设置失败。";
  463. return SendBySuccess();
  464. }
  465. /// <summary>设置多个值。</summary>
  466. /// <returns>错误信息。</returns>
  467. public string SetBytes(IEnumerable<KeyValuePair<string, byte[]>> pairs)
  468. {
  469. // Exceptions
  470. if (pairs == null) return "键值对无效。";
  471. var bytes = null as byte[];
  472. var keys = new List<string>();
  473. using (var memory = new MemoryStream())
  474. {
  475. foreach (var pair in pairs)
  476. {
  477. if (string.IsNullOrEmpty(pair.Key)) continue;
  478. if (keys.Contains(pair.Key)) continue;
  479. var keyBytes = ToBytes(pair.Key);
  480. var keyLength = ToBytes("$" + keyBytes.Length.ToString() + "\r\n");
  481. memory.Write(keyLength, 0, keyLength.Length);
  482. memory.Write(keyBytes, 0, keyBytes.Length);
  483. memory.Write(CRLF, 0, CRLF.Length);
  484. var valueBytes = pair.Value ?? new byte[0];
  485. var valueLength = ToBytes("$" + valueBytes.Length + "\r\n");
  486. memory.Write(valueLength, 0, valueLength.Length);
  487. memory.Write(valueBytes, 0, valueBytes.Length);
  488. memory.Write(CRLF, 0, CRLF.Length);
  489. keys.Add(pair.Key);
  490. }
  491. if (keys.Count > 0) bytes = memory.ToArray();
  492. }
  493. if (keys.Count < 1 || bytes == null || bytes.Length < 1) return "无法生成指令。";
  494. var head = "*" + (keys.Count * 2 + 1).ToString() + "\r\n$4\r\nMSET\r\n";
  495. var sent = Send(head, bytes);
  496. if (!sent) return "发送指令失败。";
  497. return SendBySuccess();
  498. }
  499. #endregion
  500. #region Text
  501. /// <summary>获取值。Key 无效时获取 NULL 值。</summary>
  502. public string GetText(string key)
  503. {
  504. // Exceptions
  505. if (key == null) return null;
  506. var bytes = GetBytes(key);
  507. if (bytes == null) return null;
  508. return ToText(bytes);
  509. }
  510. /// <summary>设置 UTF-8 文本。返回错误信息。</summary>
  511. public string SetText(string key, string value, bool replace = true)
  512. {
  513. // Exceptions
  514. if (key == null) return "Key 无效。";
  515. if (value == null) return "Value 无效";
  516. var bytes = value.Length < 1 ? new byte[0] : ToBytes(value);
  517. return SetBytes(key, bytes, replace);
  518. }
  519. /// <summary>设置多个值。返回错误信息。</summary>
  520. public string SetText(IEnumerable<KeyValuePair<string, string>> pairs)
  521. {
  522. // Exceptions
  523. if (pairs == null) return "键值对无效。";
  524. var list = new List<KeyValuePair<string, byte[]>>();
  525. foreach (var pair in pairs)
  526. {
  527. var value = ToBytes(pair.Value);
  528. list.Add(new KeyValuePair<string, byte[]>(pair.Key, value));
  529. }
  530. return SetBytes(list);
  531. }
  532. #endregion
  533. #region Value 数值
  534. /// <summary>增大 Key 中储存的数值,当 Key 不存在,则先初始化为 0,再执行操作。</summary>
  535. /// <remarks>参数 By 为 1 时执行 INCR,否则执行 INCRBY。</remarks>
  536. /// <returns>执行操作后的值。发生错误时返回 0 值。</returns>
  537. public long Increment(string key, long by = 1)
  538. {
  539. // Exceptions
  540. if (string.IsNullOrEmpty(key)) return 0;
  541. if (by == 1) return SendByInt("INCR", key);
  542. else return SendByInt("INCRBY", key, by);
  543. }
  544. /// <summary>减小 Key 中储存的数值,当 Key 不存在,则先初始化为 0,再执行操作。</summary>
  545. /// <remarks>参数 By 为 1 时执行 DECR,否则执行 DECRBY。</remarks>
  546. /// <returns>执行操作后的值。发生错误时返回 0 值。</returns>
  547. public long Decrement(string key, long by = 1)
  548. {
  549. // Exceptions
  550. if (string.IsNullOrEmpty(key)) return 0;
  551. if (by == 1) return SendByInt("DECR", key);
  552. return SendByInt("DECRBY", key, by);
  553. }
  554. #endregion
  555. #region Set 集合。
  556. /// <summary>返回 list, set 或 sorted set 中的元素。</summary>
  557. /// <returns>排序后的元素列表。</returns>
  558. public byte[][] Sort(string key, int skip = 0, int count = 0, bool alpha = false, bool desc = false, string by = null, string get = null)
  559. {
  560. // Exceptions
  561. if (string.IsNullOrEmpty(key)) return null;
  562. var array = new ArrayList();
  563. array.Add(key);
  564. if (skip > 0 || count > 0)
  565. {
  566. array.Add("LIMIT");
  567. array.Add(skip > 0 ? skip : 0);
  568. array.Add(count > 0 ? count : 0);
  569. }
  570. if (alpha) array.Add("ALPHA");
  571. if (desc) array.Add("DESC");
  572. if (!string.IsNullOrEmpty(by))
  573. {
  574. array.Add("BY");
  575. array.Add(by);
  576. }
  577. if (!string.IsNullOrEmpty(get))
  578. {
  579. array.Add("GET");
  580. array.Add(get);
  581. }
  582. var options = array.ToArray();
  583. return SendByDataArray("SORT", options);
  584. }
  585. /// <summary>存储 list, set 或 sorted set 中的元素。</summary>
  586. /// <returns>存储在目的列表中元素个数。</returns>
  587. public long Sort(string key, string store, int skip = 0, int count = 0, bool alpha = false, bool desc = false, string by = null, string get = null)
  588. {
  589. // Exceptions
  590. if (string.IsNullOrEmpty(key)) return -1;
  591. if (string.IsNullOrEmpty(store)) return -1;
  592. var array = new ArrayList();
  593. array.Add(key);
  594. array.Add("STORE");
  595. array.Add(store);
  596. if (skip > 0 || count > 0)
  597. {
  598. array.Add("LIMIT");
  599. array.Add(skip > 0 ? skip : 0);
  600. array.Add(count > 0 ? count : 0);
  601. }
  602. if (alpha) array.Add("ALPHA");
  603. if (desc) array.Add("DESC");
  604. if (!string.IsNullOrEmpty(by))
  605. {
  606. array.Add("BY");
  607. array.Add(by);
  608. }
  609. if (!string.IsNullOrEmpty(get))
  610. {
  611. array.Add("GET");
  612. array.Add(get);
  613. }
  614. var options = array.ToArray();
  615. return SendByInt("SORT", options);
  616. }
  617. #endregion
  618. }
  619. }