using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Net.Sockets; using System.Text; namespace Apewer.Source { /// public sealed class Redis : IDisposable { /// 最大长度。 public const int MaxLength = 1073741824; private const int BufferSize = 1048576; #region Instance string _host = null; string _pass = null; int _port = 6379; int _db = 0; int _timeout = 1000; // 1000 毫秒。 Socket socket; BufferedStream buffered; /// 连接指定主机。 public Redis(string host, int port = 6379, string password = null) { _host = string.IsNullOrEmpty(host) ? "localhost" : host; _port = (port < 1 || port > 65535) ? 6379 : port; _pass = password; _timeout = -1; } /// 连接 127.0.0.1 的指定端口。 public Redis(int port, string host = "127.0.0.1", string password = null) : this(host, port, password) { } /// 连接 127.0.0.1 的 6379 端口。 public Redis() : this("127.0.0.1", 6379, null) { } /// 获取或设置文本。 public string this[string key] { get { return GetText(key); } set { SetText(key, value); } } /// 获取错误信息。 private Action Error { get; set; } /// ~Redis() { Dispose(false); } /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } void Dispose(bool disposing) { if (disposing) { if (socket != null) { SendCommand("QUIT"); SendBySuccess(); socket.Close(); socket = null; } } } /// 连接 Redis 服务。 public string Connect() { try { if (socket != null && socket.Connected) return null; socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socket.NoDelay = true; socket.SendTimeout = _timeout; socket.Connect(_host, _port); if (!socket.Connected) { socket.Close(); socket = null; return "Socket 连接失败。"; } buffered = new BufferedStream(new NetworkStream(socket), BufferSize); if (_pass != null) return SendBySuccess("AUTH", _pass); return null; } catch (Exception ex) { var error = ex.GetType().Name + ": " + ex.Message; return error; } } #endregion #region Static // byte[] end_data = new byte[] { (byte)'\r', (byte)'\n' }; byte[] CRLF = new byte[] { 13, 10 }; static byte[] ToBytes(string text) { if (string.IsNullOrEmpty(text)) return new byte[0]; return Encoding.UTF8.GetBytes(text); } static int Length(string text) { return ToBytes(text).Length; } static string ToText(byte[] bytes) { if (bytes != null && bytes.Length > 0) { try { return Encoding.UTF8.GetString(bytes); } catch { } } return ""; } #endregion #region Common T OnError(string message, T @return) { Error?.Invoke(message); return @return; } object OnError(string message) => OnError(message, null); string ReadLine() { var sb = new StringBuilder(); int c; while ((c = buffered.ReadByte()) != -1) { if (c == '\r') continue; if (c == '\n') break; sb.Append((char)c); } return sb.ToString(); } byte[] ReadData() { string line = ReadLine(); if (line.Length < 1) return null; if (line == "$-1") return null; char flag = line[0]; if (flag == '-') return OnError(line.Substring(1), null as byte[]); if (flag == '$') { int length; if (!int.TryParse(line.Substring(1), out length)) return null; var buffer = new byte[length]; int writted = 0; while (writted < length) { int read = buffered.Read(buffer, writted, length - writted); if (read < 1) return null; writted += read; } if (buffered.ReadByte() != '\r' || buffered.ReadByte() != '\n') return null; return buffer; } return null; } bool Send(string head, byte[] data) { if (string.IsNullOrEmpty(head)) return false; if (!string.IsNullOrEmpty(Connect())) return false; byte[] HeadBytes = ToBytes(head); try { socket.Send(HeadBytes); if (!head.EndsWith("\r\n")) socket.Send(CRLF); if (data != null) { if (data.Length > 0) socket.Send(data); socket.Send(CRLF); } return true; } catch (Exception) { // SocketException 已超时。 socket.Close(); socket = null; return false; } } /// /// bool SendCommand(byte[] data, string cmd, params object[] args) { if (string.IsNullOrEmpty(cmd)) throw new ArgumentException(nameof(cmd)); var argsLength = (args != null || args is object[]) ? args.Length : 0; argsLength += 1; // cmd if (data != null) argsLength += 1; // data var sb = new StringBuilder(); sb.Append("*"); sb.Append(argsLength.ToString()); sb.Append("\r\n"); sb.Append("$"); sb.Append(cmd.Length); sb.Append("\r\n"); sb.Append(cmd); sb.Append("\r\n"); if (args != null) { foreach (var arg in args) { var line = arg == null ? "" : arg.ToString(); sb.Append("$"); sb.Append(Length(line).ToString()); sb.Append("\r\n"); sb.Append(line); sb.Append("\r\n"); } } if (data != null) { sb.Append("$"); sb.Append(data.Length.ToString()); sb.Append("\r\n"); } return Send(sb.ToString(), data); } /// /// bool SendCommand(string cmd, params object[] args) => SendCommand(null, cmd, args); string SendBySuccess(string cmd = null, params object[] args) { if (!string.IsNullOrEmpty(cmd)) { if (!SendCommand(cmd, args)) return "连接 Redis 服务器失败。"; } // 获取错误信息。 int c = buffered.ReadByte(); if (c == -1) return "没有更多字节。"; string s = ReadLine(); if (c == '-') return s.StartsWith("ERR ") ? s.Substring(4) : s; return null; } long SendByInt(byte[] data, string cmd, params object[] args) { if (!SendCommand(data, cmd, args)) return 0; int c = buffered.ReadByte(); if (c == -1) return 0; string line = ReadLine(); if (c == '-') return 0; if (c == ':') { if (long.TryParse(line, out long value)) return value; } return 0; } long SendByInt(string cmd, params object[] args) => SendByInt(null, cmd, args); string SendByString(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return null; int c = buffered.ReadByte(); if (c == -1) return null; var line = ReadLine(); if (line.Length > 0) { if (line[0] == '-') return OnError(line, line); if (line[0] == '+') return OnError(line, line); } return line; } byte[] SendByData(string cmd, params object[] args) => SendCommand(cmd, args) ? ReadData() : null; /// byte[][] SendByDataArray(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return null; int c = buffered.ReadByte(); if (c == -1) return null; string s = ReadLine(); if (c == '-') return OnError(s, null as byte[][]); if (c == '*') { int count; if (int.TryParse(s, out count)) { byte[][] result = new byte[count][]; for (int i = 0; i < count; i++) result[i] = ReadData(); return result; } } return null; } /// string[] SendByStringArray(string cmd, params object[] args) { byte[][] reply = SendByDataArray(cmd, args); if (reply == null) return null; string[] keys = new string[reply.Length]; for (int i = 0; i < reply.Length; i++) keys[i] = ToText(reply[i]); return keys; } #endregion #region DB /// 获取服务器信息。 public Dictionary Info() { byte[] r = SendByData("INFO"); var dict = new Dictionary(); foreach (var line in ToText(r).Split('\n')) { int p = line.IndexOf(':'); if (p < 0) continue; dict.Add(line.Substring(0, p), line.Substring(p + 1)); } return dict; } /// 选择数据库,默认为 0。 /// 错误信息。 public string SelectDB(int db = 0) { _db = db; return SendBySuccess("SELECT", _db); } /// 清空 DB。 /// 错误信息。 public string Flush(bool allDB = false) { return allDB ? SendBySuccess("FLUSHALL") : SendBySuccess("FLUSHDB"); } /// 执行同步保存操作,将所有数据的快照以 RDB 文件的形式保存到磁盘上。 /// 错误信息。 public string Save(bool background = true) { return background ? SendBySuccess("BGSAVE") : SendBySuccess("SAVE"); } /// 以 UNIX 时间戳格式返回最近一次 Redis 成功将数据保存到磁盘上的时间。 public DateTime LastSave() { const long UnixEpoch = 621355968000000000L; long t = SendByInt("LASTSAVE"); return new DateTime(UnixEpoch) + TimeSpan.FromSeconds(t); } /// 断开所有客户端,并关闭 Redis 服务进程。 public void Shutdown() { SendCommand("SHUTDOWN"); try { string s = ReadLine(); // 服务器可能会返回错误。 if (s.Length == 0) return; // throw new ResponseException("Zero length respose"); // throw new ResponseException(s.StartsWith("-ERR ") ? s.Substring(5) : s.Substring(1)); } catch (IOException) { // this is the expected good result socket.Close(); socket = null; } } #endregion #region Key /// 获取 DB 中 Keys 的数量。 /// 执行 DBSIZE 命令。 public long CountKeys() { return SendByInt("DBSIZE"); } /// 列出 Keys。 /// 执行 KEYS 命令。 /// 发生错误时返回 NULL 值。 public string[] ListKeys(string pattern = "*") { var p = string.IsNullOrEmpty(pattern) ? "*" : pattern; return SendByStringArray("KEYS", p); } /// 包含指定 Key。 public bool ContainsKey(string key) { if (key == null) return false; return SendByInt("EXISTS", key) == 1; } /// 删除指定 Key。 public bool Delete(string key) { if (key == null) return false; // throw new ArgumentNullException("key"); return SendByInt("DEL", key) == 1; } /// 删除指定的多个 Key。不存在的 key 会被忽略。 /// 删除的 Key 的数量。 public long Delete(params string[] keys) { if (keys == null || keys.Length < 1) return 0; return SendByInt("DEL", keys); } /// 获取 Key 的值类型,正确类型为 none | string | set | list。 public string TypeOf(string key) { if (key == null) return ""; return SendByString("TYPE", key); } /// 随机返回一个已有的 Key。 public string RandomKey() { return SendByString("RANDOMKEY"); } /// 对 Key 重命名。 /// 已经存在的 newKey 会被覆盖。 public bool Rename(string oldKey, string newKey) { // Exceptions if (string.IsNullOrEmpty(oldKey)) return false; if (string.IsNullOrEmpty(newKey)) return false; return SendByString("RENAME", oldKey, newKey)[0] == '+'; } /// 设置 Key 的过期时间,过期后 Key 会被自动删除。 /// 在 Redis 2.1.3 之前的版本中,修改已经设置过生存时间的 Key,将会和删除 Key 有同样的效果。 public bool Expire(string key, int seconds) { // Exceptions if (string.IsNullOrEmpty(key)) return false; if (seconds < 1) return false; return SendByInt("EXPIRE", key, seconds) == 1L; } /// 设置 Key 的过期时间,过期后 Key 会被自动删除。 public bool ExpireAt(string key, DateTime time) { // Exceptions if (string.IsNullOrEmpty(key)) return false; var stamp = Convert.ToInt64((time - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds); return SendByInt("EXPIREAT", key, stamp) == 1L; } /// 获取 Key 过期前的剩余时间。 /// 秒数。当 Key 不存在时返回 -1 值。 public long TimeToLive(string key) { // Exceptions if (string.IsNullOrEmpty(key)) return -1; return SendByInt("TTL", key); } #endregion #region Bytes 字节数组 /// 获取值。Key 无效时获取 NULL 值。 public byte[] GetBytes(string key) { // Exceptions if (string.IsNullOrEmpty(key)) return null; return SendByData("GET", key); } /// 获取多个 Key 值。 /// 参数无效时返回 NULL 值。 public byte[][] GetMultipleBytes(params string[] keys) { // Exceptions if (keys == null || keys.Length == 0) return null; return SendByDataArray("MGET", keys); } /// 设置字节数组。 /// 错误信息。 public string SetBytes(string key, byte[] bytes, bool replace = true) { // Exceptions if (key == null) return "Key 无效。"; if (bytes == null) return "Bytes 无效"; if (bytes.Length > MaxLength) return "Value[] 长度超出限制。"; var cmd = replace ? "SET" : "SETNX"; var success = SendCommand(bytes, cmd, key); if (!success) return "设置失败。"; return SendBySuccess(); } /// 设置多个值。 /// 错误信息。 public string SetBytes(IEnumerable> pairs) { // Exceptions if (pairs == null) return "键值对无效。"; var bytes = null as byte[]; var keys = new List(); using (var memory = new MemoryStream()) { foreach (var pair in pairs) { if (string.IsNullOrEmpty(pair.Key)) continue; if (keys.Contains(pair.Key)) continue; var keyBytes = ToBytes(pair.Key); var keyLength = ToBytes("$" + keyBytes.Length.ToString() + "\r\n"); memory.Write(keyLength, 0, keyLength.Length); memory.Write(keyBytes, 0, keyBytes.Length); memory.Write(CRLF, 0, CRLF.Length); var valueBytes = pair.Value ?? new byte[0]; var valueLength = ToBytes("$" + valueBytes.Length + "\r\n"); memory.Write(valueLength, 0, valueLength.Length); memory.Write(valueBytes, 0, valueBytes.Length); memory.Write(CRLF, 0, CRLF.Length); keys.Add(pair.Key); } if (keys.Count > 0) bytes = memory.ToArray(); } if (keys.Count < 1 || bytes == null || bytes.Length < 1) return "无法生成指令。"; var head = "*" + (keys.Count * 2 + 1).ToString() + "\r\n$4\r\nMSET\r\n"; var sent = Send(head, bytes); if (!sent) return "发送指令失败。"; return SendBySuccess(); } #endregion #region Text /// 获取值。Key 无效时获取 NULL 值。 public string GetText(string key) { // Exceptions if (key == null) return null; var bytes = GetBytes(key); if (bytes == null) return null; return ToText(bytes); } /// 设置 UTF-8 文本。返回错误信息。 public string SetText(string key, string value, bool replace = true) { // Exceptions if (key == null) return "Key 无效。"; if (value == null) return "Value 无效"; var bytes = value.Length < 1 ? new byte[0] : ToBytes(value); return SetBytes(key, bytes, replace); } /// 设置多个值。返回错误信息。 public string SetText(IEnumerable> pairs) { // Exceptions if (pairs == null) return "键值对无效。"; var list = new List>(); foreach (var pair in pairs) { var value = ToBytes(pair.Value); list.Add(new KeyValuePair(pair.Key, value)); } return SetBytes(list); } #endregion #region Value 数值 /// 增大 Key 中储存的数值,当 Key 不存在,则先初始化为 0,再执行操作。 /// 参数 By 为 1 时执行 INCR,否则执行 INCRBY。 /// 执行操作后的值。发生错误时返回 0 值。 public long Increment(string key, long by = 1) { // Exceptions if (string.IsNullOrEmpty(key)) return 0; if (by == 1) return SendByInt("INCR", key); else return SendByInt("INCRBY", key, by); } /// 减小 Key 中储存的数值,当 Key 不存在,则先初始化为 0,再执行操作。 /// 参数 By 为 1 时执行 DECR,否则执行 DECRBY。 /// 执行操作后的值。发生错误时返回 0 值。 public long Decrement(string key, long by = 1) { // Exceptions if (string.IsNullOrEmpty(key)) return 0; if (by == 1) return SendByInt("DECR", key); return SendByInt("DECRBY", key, by); } #endregion #region Set 集合。 /// 返回 list, set 或 sorted set 中的元素。 /// 排序后的元素列表。 public byte[][] Sort(string key, int skip = 0, int count = 0, bool alpha = false, bool desc = false, string by = null, string get = null) { // Exceptions if (string.IsNullOrEmpty(key)) return null; var array = new ArrayList(); array.Add(key); if (skip > 0 || count > 0) { array.Add("LIMIT"); array.Add(skip > 0 ? skip : 0); array.Add(count > 0 ? count : 0); } if (alpha) array.Add("ALPHA"); if (desc) array.Add("DESC"); if (!string.IsNullOrEmpty(by)) { array.Add("BY"); array.Add(by); } if (!string.IsNullOrEmpty(get)) { array.Add("GET"); array.Add(get); } var options = array.ToArray(); return SendByDataArray("SORT", options); } /// 存储 list, set 或 sorted set 中的元素。 /// 存储在目的列表中元素个数。 public long Sort(string key, string store, int skip = 0, int count = 0, bool alpha = false, bool desc = false, string by = null, string get = null) { // Exceptions if (string.IsNullOrEmpty(key)) return -1; if (string.IsNullOrEmpty(store)) return -1; var array = new ArrayList(); array.Add(key); array.Add("STORE"); array.Add(store); if (skip > 0 || count > 0) { array.Add("LIMIT"); array.Add(skip > 0 ? skip : 0); array.Add(count > 0 ? count : 0); } if (alpha) array.Add("ALPHA"); if (desc) array.Add("DESC"); if (!string.IsNullOrEmpty(by)) { array.Add("BY"); array.Add(by); } if (!string.IsNullOrEmpty(get)) { array.Add("GET"); array.Add(get); } var options = array.ToArray(); return SendByInt("SORT", options); } #endregion } }