|
|
using System; using System.IO; using System.Collections.Generic; using System.Net.Sockets; using System.Diagnostics;
namespace Apewer.Source {
/// <summary></summary>
public class Redis : IDisposable {
Socket socket; BufferedStream bstream; int db = 0;
/// <summary>连接指定主机。</summary>
public Redis(string host, int port = 6379, string password = null) { Host = string.IsNullOrEmpty(host) ? "localhost" : host; Port = (port < 1 || port > 65535) ? 6379 : port; Password = password; SendTimeout = -1; }
/// <summary>连接 localhost 的指定端口。</summary>
public Redis(int port, string host = "localhost", string password = null) : this(host, port, password) { }
/// <summary>连接 localhost 的 6379 端口。</summary>
public Redis() : this("localhost", 6379, null) { }
/// <summary>主机。</summary>
public string Host { get; private set; }
/// <summary>端口。</summary>
public int Port { get; private set; }
/// <summary>重试超时。</summary>
public int RetryTimeout { get; set; }
/// <summary>重试次数。</summary>
public int RetryCount { get; set; }
/// <summary>Socket 发送超时。</summary>
public int SendTimeout { get; set; }
/// <summary>密码。</summary>
public string Password { get; set; }
/// <summary>设置要日志输出。</summary>
public Action<string> Log { get; set; }
/// <summary>数据库编号,默认为 0。</summary>
public int Db { get { return db; } set { db = value; SendExpectSuccess("SELECT", db); } }
/// <summary>获取或设置文本。</summary>
public string this[string key] { get { return GetString(key); } set { Set(key, value); } }
/// <summary>设置 UTF-8 文本。返回错误信息。</summary>
public string Set(string key, string value, bool ifNotExist = false) { if (key == null) return "Key 无效。"; if (value == null) return "Value 无效"; var bytes = value.Length < 1 ? new byte[0] : GetBytes(value); return Set(key, bytes, ifNotExist); }
/// <summary>设置字节数组,Bytes 最大长度为 1073741824。返回错误信息。</summary>
public string Set(string key, byte[] bytes, bool ifNotExist = false) { if (key == null) return "Key 无效。"; if (bytes == null) return "Bytes 无效"; if (bytes.Length > 1073741824) return "Value[] 长度超出限制。";
var cmd = ifNotExist ? "SETNX" : "SET"; var success = SendDataCommand(bytes, cmd, key); if (!success) return "设置失败。";
return ExpectSuccess(); }
/// <summary>设置多个值。返回错误信息。</summary>
public string Set(IDictionary<string, string> dict) { if (dict == null) return "字典无效。"; var newDict = new Dictionary<string, byte[]>(); foreach (var i in dict) newDict.Add(i.Key, GetBytes(i.Value)); return Set(newDict); // return Set(dict.ToDictionary(k => k.Key, v => GetBytes(v.Value)));
}
/// <summary>设置多个值。返回错误信息。</summary>
public string Set(IDictionary<string, byte[]> dict) { if (dict == null) return "字典无效。"; var keys = new List<string>(); var values = new List<byte[]>(); foreach (var i in dict) { keys.Add(i.Key); values.Add(i.Value); } return Set(keys.ToArray(), values.ToArray()); //return Set(dict.Keys.ToArray(), dict.Values.ToArray());
}
/// <summary>设置多个值。返回错误信息。</summary>
public string Set(string[] keys, byte[][] values) { if (keys == null) return "Keys 无效。"; if (values == null) return "Values 无效"; if (keys.Length != values.Length) return "Keys 和 Values 的长度不相等";
byte[] nl = GetBytes("\r\n"); var ms = new MemoryStream();
for (int i = 0; i < keys.Length; i++) { byte[] key = GetBytes(keys[i]); byte[] val = values[i]; byte[] kLength = GetBytes("$" + key.Length + "\r\n"); byte[] k = GetBytes(keys[i] + "\r\n"); byte[] vLength = GetBytes("$" + val.Length + "\r\n"); ms.Write(kLength, 0, kLength.Length); ms.Write(k, 0, k.Length); ms.Write(vLength, 0, vLength.Length); ms.Write(val, 0, val.Length); ms.Write(nl, 0, nl.Length); }
SendDataRESP(ms.ToArray(), "*" + (keys.Length * 2 + 1) + "\r\n$4\r\nMSET\r\n"); return ExpectSuccess(); }
/// <summary>获取值。Key 无效时获取 NULL 值。</summary>
public byte[] Get(string key) { if (key == null) return null; return SendExpectData("GET", key); }
/// <summary>获取值。Key 无效时获取 NULL 值。</summary>
public string GetString(string key) { if (key == null) return null; var bytes = Get(key); if (bytes == null) return null; return GetString(bytes); }
/// <summary>排序。</summary>
public byte[][] Sort(string key = null, string storeInKey = null, bool descending = false, bool lexographically = false, int lowerLimit = 0, int upperLimit = 0, string by = null, string get = null ) { var args = new System.Collections.ArrayList();
if (lowerLimit != 0 || upperLimit != 0) { args.Add("LIMIT"); args.Add(lowerLimit); args.Add(upperLimit); } if (lexographically) args.Add("ALPHA"); if (!string.IsNullOrEmpty(by)) { args.Add("BY"); args.Add(by); } if (!string.IsNullOrEmpty(get)) { args.Add("GET"); args.Add(get); }
var argsArray = args.ToArray();
return Sort(key, storeInKey, argsArray); }
// public byte[][] Sort(RedisSortOptions options)
// {
// return Sort(options.Key, options.StoreInKey, options.ToArgs());
// }
/// <summary>排序。Key 无效时返回 NULL 值。</summary>
public byte[][] Sort(string key, string destination, params object[] options) { if (key == null) return null; // throw new ArgumentNullException("key");
int offset = string.IsNullOrEmpty(destination) ? 1 : 3; object[] args = new object[offset + options.Length];
args[0] = key; Array.Copy(options, 0, args, offset, options.Length); if (offset == 1) { return SendExpectDataArray("SORT", args); } else { args[1] = "STORE"; args[2] = destination; int n = SendExpectInt("SORT", args); return new byte[n][]; } }
/// <summary>获取集合。获取失败时返回 NULL 值。</summary>
public byte[] GetSet(string key, byte[] value) { if (key == null) return null; // throw new ArgumentNullException("key");
if (value == null) return null; // throw new ArgumentNullException("value");
if (value.Length > 1073741824) return null; // throw new ArgumentException("value exceeds 1G", "value");
if (!SendDataCommand(value, "GETSET", key)) return null; // throw new Exception("Unable to connect");
return ReadData(); }
/// <summary>获取集合。获取失败时返回 NULL 值。</summary>
public string GetSet(string key, string value) { if (key == null) return null; // throw new ArgumentNullException("key");
if (value == null) return null; // throw new ArgumentNullException("value");
return GetString(GetSet(key, GetBytes(value))); }
string ReadLine() { var sb = new System.Text.StringBuilder(); int c;
while ((c = bstream.ReadByte()) != -1) { if (c == '\r') continue; if (c == '\n') break; sb.Append((char)c); } return sb.ToString(); }
string Connect() { try { socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socket.NoDelay = true; socket.SendTimeout = SendTimeout; socket.Connect(Host, Port); if (!socket.Connected) { socket.Close(); socket = null; return "连接失败。"; } bstream = new BufferedStream(new NetworkStream(socket), 16 * 1024);
if (Password != null) return SendExpectSuccess("AUTH", Password); return null; } catch (Exception ex) { var error = ex.GetType().Name + ": " + ex.Message; return error; } }
byte[] end_data = new byte[] { (byte)'\r', (byte)'\n' };
bool SendDataCommand(byte[] data, string cmd, params object[] args) { string resp = "*" + (1 + args.Length + 1).ToString() + "\r\n"; resp += "$" + cmd.Length + "\r\n" + cmd + "\r\n"; foreach (object arg in args) { string argStr = arg.ToString(); int argStrLength = GetByteCount(argStr); resp += "$" + argStrLength + "\r\n" + argStr + "\r\n"; } resp += "$" + data.Length + "\r\n";
return SendDataRESP(data, resp); }
bool SendDataRESP(byte[] data, string resp) { if (socket == null) Connect(); if (socket == null) return false;
byte[] r = GetBytes(resp); try { ShowLog("C", resp); socket.Send(r); if (data != null) { socket.Send(data); socket.Send(end_data); } } catch (SocketException) { // timeout;
socket.Close(); socket = null;
return false; } return true; }
bool SendCommand(string cmd, params object[] args) { if (socket == null) Connect(); if (socket == null) return false;
string resp = "*" + (1 + args.Length).ToString() + "\r\n"; resp += "$" + cmd.Length + "\r\n" + cmd + "\r\n"; foreach (object arg in args) { string argStr = arg.ToString(); int argStrLength = GetByteCount(argStr); resp += "$" + argStrLength + "\r\n" + argStr + "\r\n"; }
byte[] r = GetBytes(resp); try { ShowLog("C", resp); socket.Send(r); } catch (SocketException) { // timeout;
socket.Close(); socket = null;
return false; } return true; }
[Conditional("DEBUG")] void ShowLog(string id, string message) { Log?.Invoke(id + ": " + message.Trim().Replace("\r\n", " ")); }
string ExpectSuccess() { int c = bstream.ReadByte(); if (c == -1) return "没有更多字节。";
string s = ReadLine(); ShowLog("S", (char)c + s); if (c == '-') return s.StartsWith("ERR ") ? s.Substring(4) : s;
return null; }
string SendExpectSuccess(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return "无法连接。"; // throw new Exception("Unable to connect");
return ExpectSuccess(); }
int SendDataExpectInt(byte[] data, string cmd, params object[] args) { if (!SendDataCommand(data, cmd, args)) return 0; // throw new Exception("Unable to connect");
int c = bstream.ReadByte(); if (c == -1) return 0; // throw new ResponseException("No more data");
string s = ReadLine(); ShowLog("S", (char)c + s); if (c == '-') return 0; // throw new ResponseException(s.StartsWith("ERR ") ? s.Substring(4) : s);
if (c == ':') { int i; if (int.TryParse(s, out i)) return i; } return 0; // throw new ResponseException("Unknown reply on integer request: " + c + s);
}
int SendExpectInt(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return 0; // throw new Exception("Unable to connect");
int c = bstream.ReadByte(); if (c == -1) return 0; // throw new ResponseException("No more data");
string s = ReadLine(); ShowLog("S", (char)c + s); if (c == '-') return 0; // throw new ResponseException(s.StartsWith("ERR ") ? s.Substring(4) : s);
if (c == ':') { int i; if (int.TryParse(s, out i)) return i; } return 0; // throw new ResponseException("Unknown reply on integer request: " + c + s);
}
string SendExpectString(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return null; // throw new Exception("Unable to connect");
int c = bstream.ReadByte(); if (c == -1) return null; // throw new ResponseException("No more data");
string s = ReadLine(); ShowLog("S", (char)c + s); if (c == '-') return null; // throw new ResponseException(s.StartsWith("ERR ") ? s.Substring(4) : s);
if (c == '+') return s; return null; // throw new ResponseException("Unknown reply on integer request: " + c + s);
}
string SendGetString(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return null; // 连接失败时返回 NULL 值。
return ReadLine(); }
byte[] SendExpectData(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return null; // 连接失败时返回 NULL 值。
return ReadData(); }
byte[] ReadData() { string s = ReadLine(); ShowLog("S", s); if (s.Length == 0) return null; // 失败时返回 NULL 值。不 Throw 异常。
char c = s[0]; if (c == '-') return null; // throw new ResponseException(s.StartsWith("-ERR ") ? s.Substring(5) : s.Substring(1));
if (c == '$') { if (s == "$-1") return null; int n;
if (Int32.TryParse(s.Substring(1), out n)) { byte[] retbuf = new byte[n];
int bytesRead = 0; do { int read = bstream.Read(retbuf, bytesRead, n - bytesRead); if (read < 1) return null; // throw new ResponseException("Invalid termination mid stream");
bytesRead += read; } while (bytesRead < n); if (bstream.ReadByte() != '\r' || bstream.ReadByte() != '\n') return null; // throw new ResponseException("Invalid termination");
return retbuf; } return null; // throw new ResponseException("Invalid length");
}
/* don't treat arrays here because only one element works -- use DataArray! //returns the number of matches
if (c == '*') { int n; if (Int32.TryParse(s.Substring(1), out n)) return n <= 0 ? new byte [0] : ReadData();
throw new ResponseException ("Unexpected length parameter" + r); } */
return null; // throw new ResponseException("Unexpected reply: " + s);
}
/// <summary></summary>
public bool ContainsKey(string key) { if (key == null) return false; // throw new ArgumentNullException("key");
return SendExpectInt("EXISTS", key) == 1; }
/// <summary></summary>
public bool Remove(string key) { if (key == null) return false; // throw new ArgumentNullException("key");
return SendExpectInt("DEL", key) == 1; } /// <summary></summary>
public int Remove(params string[] args) { if (args == null) return 0; // throw new ArgumentNullException("args");
return SendExpectInt("DEL", args); }
/// <summary></summary>
public int Increment(string key) { if (key == null) return 0; // throw new ArgumentNullException("key");
return SendExpectInt("INCR", key); }
/// <summary></summary>
public int Increment(string key, int count) { if (key == null) return 0; // throw new ArgumentNullException("key");
return SendExpectInt("INCRBY", key, count); }
/// <summary></summary>
public int Decrement(string key) { if (key == null) return 0; // throw new ArgumentNullException("key");
return SendExpectInt("DECR", key); }
/// <summary></summary>
public int Decrement(string key, int count) { if (key == null) return 0; // throw new ArgumentNullException("key");
return SendExpectInt("DECRBY", key, count); }
/// <summary>获取 Key 的值类型,正确类型为 none | string | set | list。</summary>
public string TypeOf(string key) { if (key == null) return ""; return SendExpectString("TYPE", key); }
/// <summary></summary>
public string RandomKey() { return SendExpectString("RANDOMKEY"); }
/// <summary></summary>
public bool Rename(string oldKeyname, string newKeyname) { if (oldKeyname == null) return false; // throw new ArgumentNullException("oldKeyname");
if (newKeyname == null) return false; // throw new ArgumentNullException("newKeyname");
return SendGetString("RENAME", oldKeyname, newKeyname)[0] == '+'; }
/// <summary></summary>
public bool Expire(string key, int seconds) { if (key == null) return false; // throw new ArgumentNullException("key");
return SendExpectInt("EXPIRE", key, seconds) == 1; }
/// <summary></summary>
public bool ExpireAt(string key, int time) { if (key == null) return false;// throw new ArgumentNullException("key");
return SendExpectInt("EXPIREAT", key, time) == 1; }
/// <summary></summary>
public int TimeToLive(string key) { if (key == null) return 0; // throw new ArgumentNullException("key");
return SendExpectInt("TTL", key); }
/// <summary></summary>
public int DbSize { get { return SendExpectInt("DBSIZE"); } }
/// <summary></summary>
public void Save() { SendExpectSuccess("SAVE"); }
/// <summary></summary>
public void BackgroundSave() { SendExpectSuccess("BGSAVE"); }
/// <summary></summary>
public void Shutdown() { SendCommand("SHUTDOWN"); try { // the server may return an error
string s = ReadLine(); ShowLog("S", s); if (s.Length == 0) return; // throw new ResponseException("Zero length respose");
// throw new ResponseException(s.StartsWith("-ERR ") ? s.Substring(5) : s.Substring(1));
} catch (IOException) { // this is the expected good result
socket.Close(); socket = null; } }
/// <summary></summary>
public void FlushAll() { SendExpectSuccess("FLUSHALL"); }
/// <summary></summary>
public void FlushDb() { SendExpectSuccess("FLUSHDB"); }
const long UnixEpoch = 621355968000000000L;
/// <summary></summary>
public DateTime LastSave { get { int t = SendExpectInt("LASTSAVE");
return new DateTime(UnixEpoch) + TimeSpan.FromSeconds(t); } }
/// <summary></summary>
public Dictionary<string, string> GetInfo() { byte[] r = SendExpectData("INFO"); var dict = new Dictionary<string, string>();
foreach (var line in GetString(r).Split('\n')) { int p = line.IndexOf(':'); if (p == -1) continue; dict.Add(line.Substring(0, p), line.Substring(p + 1)); } return dict; }
/// <summary></summary>
public string[] Keys { get { return GetKeys("*"); } }
/// <summary></summary>
public string[] GetKeys(string pattern) { if (pattern == null) return null; // throw new ArgumentNullException("pattern");
return SendExpectStringArray("KEYS", pattern); }
/// <summary></summary>
public byte[][] MGet(params string[] keys) { if (keys == null) return null; // throw new ArgumentNullException("keys");
if (keys.Length == 0) return null; // throw new ArgumentException("keys");
return SendExpectDataArray("MGET", keys); }
/// <summary></summary>
public string[] SendExpectStringArray(string cmd, params object[] args) { byte[][] reply = SendExpectDataArray(cmd, args); string[] keys = new string[reply.Length]; for (int i = 0; i < reply.Length; i++) keys[i] = GetString(reply[i]); return keys; }
/// <summary></summary>
public byte[][] SendExpectDataArray(string cmd, params object[] args) { if (!SendCommand(cmd, args)) return null; // throw new Exception("Unable to connect");
int c = bstream.ReadByte(); if (c == -1) return null; // throw new ResponseException("No more data");
string s = ReadLine(); ShowLog("S", (char)c + s); if (c == '-') return null; // throw new ResponseException(s.StartsWith("ERR ") ? s.Substring(4) : s);
if (c == '*') { int count; if (int.TryParse(s, out count)) { byte[][] result = new byte[count][]; for (int i = 0; i < count; i++) result[i] = ReadData(); return result; } } return null; // throw new ResponseException("Unknown reply on multi-request: " + c + s);
}
#region List commands
/// <summary></summary>
public byte[][] ListRange(string key, int start, int end) { return SendExpectDataArray("LRANGE", key, start, end); }
/// <summary></summary>
public void LeftPush(string key, string value) { LeftPush(key, GetBytes(value)); }
/// <summary></summary>
public void LeftPush(string key, byte[] value) { SendDataCommand(value, "LPUSH", key); ExpectSuccess(); }
/// <summary></summary>
public void RightPush(string key, string value) { RightPush(key, GetBytes(value)); }
/// <summary></summary>
public void RightPush(string key, byte[] value) { SendDataCommand(value, "RPUSH", key); ExpectSuccess(); }
/// <summary></summary>
public int ListLength(string key) { return SendExpectInt("LLEN", key); }
/// <summary></summary>
public byte[] ListIndex(string key, int index) { SendCommand("LINDEX", key, index); return ReadData(); }
/// <summary></summary>
public byte[] LeftPop(string key) { SendCommand("LPOP", key); return ReadData(); }
/// <summary></summary>
public byte[] RightPop(string key) { SendCommand("RPOP", key); return ReadData(); }
#endregion
#region Set commands
/// <summary></summary>
public bool AddToSet(string key, byte[] member) { return SendDataExpectInt(member, "SADD", key) > 0; }
/// <summary></summary>
public bool AddToSet(string key, string member) { return AddToSet(key, GetBytes(member)); }
/// <summary></summary>
public int CardinalityOfSet(string key) { return SendExpectInt("SCARD", key); }
/// <summary></summary>
public bool IsMemberOfSet(string key, byte[] member) { return SendDataExpectInt(member, "SISMEMBER", key) > 0; }
/// <summary></summary>
public bool IsMemberOfSet(string key, string member) { return IsMemberOfSet(key, GetBytes(member)); }
/// <summary></summary>
public byte[][] GetMembersOfSet(string key) { return SendExpectDataArray("SMEMBERS", key); }
/// <summary></summary>
public byte[] GetRandomMemberOfSet(string key) { return SendExpectData("SRANDMEMBER", key); }
/// <summary></summary>
public byte[] PopRandomMemberOfSet(string key) { return SendExpectData("SPOP", key); }
/// <summary></summary>
public bool RemoveFromSet(string key, byte[] member) { return SendDataExpectInt(member, "SREM", key) > 0; }
/// <summary></summary>
public bool RemoveFromSet(string key, string member) { return RemoveFromSet(key, GetBytes(member)); }
/// <summary></summary>
public byte[][] GetUnionOfSets(params string[] keys) { if (keys == null) return null; // throw new ArgumentNullException();
return SendExpectDataArray("SUNION", keys); }
void StoreSetCommands(string cmd, params string[] keys) { if (String.IsNullOrEmpty(cmd)) return; // throw new ArgumentNullException("cmd");
if (keys == null) return; // throw new ArgumentNullException("keys");
SendExpectSuccess(cmd, keys); }
/// <summary></summary>
public void StoreUnionOfSets(params string[] keys) { StoreSetCommands("SUNIONSTORE", keys); }
/// <summary></summary>
public byte[][] GetIntersectionOfSets(params string[] keys) { if (keys == null) return null; // throw new ArgumentNullException();
return SendExpectDataArray("SINTER", keys); }
/// <summary></summary>
public void StoreIntersectionOfSets(params string[] keys) { StoreSetCommands("SINTERSTORE", keys); }
/// <summary></summary>
public byte[][] GetDifferenceOfSets(params string[] keys) { if (keys == null) return null; // throw new ArgumentNullException();
return SendExpectDataArray("SDIFF", keys); }
/// <summary></summary>
public void StoreDifferenceOfSets(params string[] keys) { StoreSetCommands("SDIFFSTORE", keys); }
/// <summary></summary>
public bool MoveMemberToSet(string srcKey, string destKey, byte[] member) { return SendDataExpectInt(member, "SMOVE", srcKey, destKey) > 0; }
#endregion
/// <summary></summary>
public void Dispose() { Dispose(true); GC.SuppressFinalize(this); }
/// <summary></summary>
~Redis() { Dispose(false); }
/// <summary></summary>
protected virtual void Dispose(bool disposing) { if (disposing) { SendCommand("QUIT"); ExpectSuccess(); socket.Close(); socket = null; } }
private static byte[] GetBytes(string text) { if (string.IsNullOrEmpty(text)) return new byte[0]; return System.Text.Encoding.UTF8.GetBytes(text); }
private static string GetString(byte[] bytes) { if (bytes != null && bytes.Length > 0) { try { return System.Text.Encoding.UTF8.GetString(bytes); } catch { } } return "";
}
private static int GetByteCount(string text) { if (string.IsNullOrEmpty(text)) return 0; return System.Text.Encoding.UTF8.GetByteCount(text); }
}
}
|