await _redisService.ExecuteWithLockAsync(async () => { //加鎖 Console.WriteLine("執(zhí)行鎖定的部分"); //需要執(zhí)行的方法 Method(); Console.WriteLine("鎖定部分執(zhí)行完成"); }); public async Task ExecuteWithLockAsync(Func<Task> action) { var lockValue = Guid.NewGuid().ToString(); // 唯一鎖標識 if (await AcquireLockAsync(lockValue)) { try { await action(); // 執(zhí)行需要加鎖的方法 } finally { await ReleaseLockAsync(lockValue); } } else { _logger.Error(new EventData() { Message = "無法獲取鎖,,另一個實例正在進行" }); } } public async Task<bool> AcquireLockAsync(string lockValue) { var db = _connection.GetDatabase(); return await db.StringSetAsync(_lockKey, lockValue, _lockTimeout, when :When.NotExists); } public async Task ReleaseLockAsync(string lockValue) { var db = _connection.GetDatabase(); var value = await db.StringGetAsync(_lockKey); if (value == lockValue) { await db.KeyDeleteAsync(_lockKey); } } using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Runtime.Serialization.Formatters.Binary; using System.Text; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore.Metadata.Internal; using Microsoft.Extensions.Options; using OAPlusCore.Logging; using StackExchange.Redis; using SystemSettings.Application.Options; namespace SystemSettings.Application.Service { /*================================================================= * Author: * CreatedTime: 2023/12/21 11:01:36 * Description: 添加redis緩存服務(wù) ===================================================================*/ public class RedisService { private IOptions<RedisSettingOption> redis_option; private ILogRecorder<RedisService> _logger; /// <summary> /// redis 連接對象 /// </summary> //private ConnectionMultiplexer _sentinelConnection; private ConnectionMultiplexer _connection; private ConfigurationOptions _sentinelOptions; private readonly string _lockKey = "myDistributedLock"; private readonly TimeSpan _lockTimeout = TimeSpan.FromMinutes(5); // 鎖超時 private readonly object obj = new object(); public RedisService(IOptions<RedisSettingOption> redisOption, ILogRecorder<RedisService> logger) { this.redis_option = redisOption; this._logger = logger; initRedis(); } private void initRedis() { try { _sentinelOptions = new ConfigurationOptions() { ConnectTimeout = redis_option.Value.ConnectTimeout ?? 1000 }; var IpArray = redis_option.Value.Sentinel; foreach (var item in IpArray) { _sentinelOptions.EndPoints.Add(item); } _sentinelOptions.TieBreaker = ""; _sentinelOptions.CommandMap = CommandMap.Sentinel; _sentinelOptions.AbortOnConnectFail = true; _sentinelOptions.Password = redis_option.Value.Password; // Connect! var _sentinelConnection = ConnectionMultiplexer.Connect(_sentinelOptions); ConfigurationOptions redisServiceOptions = new ConfigurationOptions(); redisServiceOptions.ServiceName = redis_option.Value.ServiceName; //master名稱 redisServiceOptions.Password = redis_option.Value.Password; //master訪問密碼 redisServiceOptions.AbortOnConnectFail = true; redisServiceOptions.AllowAdmin = true; _connection = _sentinelConnection.GetSentinelMasterConnection(redisServiceOptions); AddRegisterEvent(); } catch (Exception ex) { _logger.Error(new EventData() { Type = $"RedisService", Message = "redis初始化異常,請檢查redis配置", Labels = { ["Error"] = ex.Message, ["StackTrace"]=ex.StackTrace } }); } } //項目Application 引用包:StackExchange.Redis(2.5.61) /// <summary> /// 連接redis /// </summary> /// <returns></returns> public IDatabase getDatabase { //get //{ // var _connMultiplexer = _sentinelConnection.GetSentinelMasterConnection(new ConfigurationOptions() // { // ServiceName = redis_option.Value.ServiceName, //master名稱 // Password = redis_option.Value.Password, //master訪問密碼 // AbortOnConnectFail = false, // AllowAdmin = true // }); // return _connMultiplexer.GetDatabase(redis_option.Value.DBNo); //} get { if (_connection != null && _connection.IsConnected) { return _connection.GetDatabase(redis_option.Value.DBNo); } lock (obj) { if (_connection != null && _connection.IsConnected) { return _connection.GetDatabase(redis_option.Value.DBNo); } else { initRedis(); return _connection.GetDatabase(redis_option.Value.DBNo); } } } //if (_connMultiplexer == null) //{ // ConfigurationOptions sentinelOptions = new ConfigurationOptions(); // var IpArray = redis_option.Value.Sentinel; // foreach (var item in IpArray) // { // sentinelOptions.EndPoints.Add(item); // } // sentinelOptions.TieBreaker = ""; // sentinelOptions.CommandMap = CommandMap.Sentinel; // sentinelOptions.AbortOnConnectFail = true; // sentinelOptions.Password = redis_option.Value.Password; //sentinel訪問密碼 // sentinelOptions.ConnectTimeout = 1000; // // Connect! // ConnectionMultiplexer sentinelConnection = ConnectionMultiplexer.Connect(sentinelOptions); // // Get a connection to the master // ConfigurationOptions redisServiceOptions = new ConfigurationOptions(); // redisServiceOptions.ServiceName = redis_option.Value.ServiceName; //master名稱 // redisServiceOptions.Password = redis_option.Value.Password; //master訪問密碼 // redisServiceOptions.AbortOnConnectFail = true; // redisServiceOptions.AllowAdmin = true; // redisServiceOptions.ConnectTimeout = 1000; // _connMultiplexer = sentinelConnection.GetSentinelMasterConnection(redisServiceOptions); // AddRegisterEvent(); //} } /// <summary> /// Redis保存數(shù)據(jù) /// </summary> /// <param name="key">Key</param> /// <param name="value">Value</param> /// <returns></returns> public bool doSave(String key, String value, int seconds = 0) { try { if (seconds > 0) { //getConnection(); getDatabase.StringSet(key, value, TimeSpan.FromSeconds(seconds)); } else { getDatabase.StringSet(key, value); } } catch (Exception ex) { _logger.Error(new EventData() { Message = "Redis保存數(shù)據(jù)失敗:" + ex.Message }); } return true; } /// <summary> /// Redis獲取Key值 /// </summary> /// <param name="key"></param> /// <returns></returns> public String getValue(String key) { try { //getConnection(); string result = getDatabase.StringGet(key); return result; } catch (Exception ex) { _logger.Error(new EventData() { Message = "Redis獲取數(shù)據(jù)失?。? + ex.Message }); return ""; } } /// <summary> /// Redis保存對象數(shù)據(jù) /// </summary> /// <param name="key">Key</param> /// <param name="value">Value</param> /// <returns></returns> public bool doSaveObject(String key, byte[] value, int seconds = 300) { try { if (seconds > 0) //getConnection(); getDatabase.StringSet(key, value, TimeSpan.FromSeconds(seconds)); else getDatabase.StringSet(key, value); } catch (Exception ex) { _logger.Error(new EventData() { Message = "Redis保存數(shù)據(jù)失敗:" + ex.Message }); } return true; } public bool KeyDelete(string key) { bool result = false; try { if (!key.EndsWith("*")) { key = key + "*"; } var db = _connection.GetDatabase(redis_option.Value.DBNo); var points = _connection.GetEndPoints(); foreach (var point in points) { var server = _connection.GetServer(point); var keys = server.Keys(db.Database, key);//ToArray foreach (var k in keys) { db.KeyDelete(k); } } } catch (Exception ex) { _logger.Error(new EventData() { Message = "Redis KeyDelete 失?。? + ex.Message }); result = false; } return result; } //根據(jù)key獲取Redis保存的對象 public T getObject<T>(T t, string key) { //getConnection(); var myObjectBytes = getDatabase.StringGet(key); var formatter = new BinaryFormatter(); if (!myObjectBytes.IsNull) { using (var stream = new MemoryStream(myObjectBytes)) { var retrievedObject = (T)formatter.Deserialize(stream); return retrievedObject; } } return t; } /// <summary> /// 獲取自增主鍵 /// </summary> /// <param name="key"></param> /// <returns></returns> public long Increment(string key) { //getConnection(); // 使用Redis的INCR命令來自增鍵值 return getDatabase.StringIncrement(key); } #region 注冊事件 /// <summary> /// 添加注冊事件 /// </summary> private void AddRegisterEvent() { _connection.ConnectionRestored += ConnMultiplexer_ConnectionRestored; _connection.ConnectionFailed += ConnMultiplexer_ConnectionFailed; _connection.ErrorMessage += ConnMultiplexer_ErrorMessage; _connection.ConfigurationChanged += ConnMultiplexer_ConfigurationChanged; _connection.HashSlotMoved += ConnMultiplexer_HashSlotMoved; _connection.InternalError += ConnMultiplexer_InternalError; _connection.ConfigurationChangedBroadcast += ConnMultiplexer_ConfigurationChangedBroadcast; } /// <summary> /// 重新配置廣播時(通常意味著主從同步更改) /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void ConnMultiplexer_ConfigurationChangedBroadcast(object sender, EndPointEventArgs e) { Console.WriteLine($"{nameof(ConnMultiplexer_ConfigurationChangedBroadcast)}: {e.EndPoint}"); } /// <summary> /// 發(fā)生內(nèi)部錯誤時(主要用于調(diào)試) /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void ConnMultiplexer_InternalError(object sender, InternalErrorEventArgs e) { Console.WriteLine($"{nameof(ConnMultiplexer_InternalError)}: {e.Exception}"); } /// <summary> /// 更改集群時 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void ConnMultiplexer_HashSlotMoved(object sender, HashSlotMovedEventArgs e) { Console.WriteLine( $"{nameof(ConnMultiplexer_HashSlotMoved)}: {nameof(e.OldEndPoint)}-{e.OldEndPoint} To {nameof(e.NewEndPoint)}-{e.NewEndPoint}, "); } /// <summary> /// 配置更改時 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void ConnMultiplexer_ConfigurationChanged(object sender, EndPointEventArgs e) { Console.WriteLine($"{nameof(ConnMultiplexer_ConfigurationChanged)}: {e.EndPoint}"); } /// <summary> /// 發(fā)生錯誤時 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void ConnMultiplexer_ErrorMessage(object sender, RedisErrorEventArgs e) { Console.WriteLine($"{nameof(ConnMultiplexer_ErrorMessage)}: {e.Message}"); } /// <summary> /// 物理連接失敗時 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void ConnMultiplexer_ConnectionFailed(object sender, ConnectionFailedEventArgs e) { Console.WriteLine($"{nameof(ConnMultiplexer_ConnectionFailed)}: {e.Exception}"); } /// <summary> /// 建立物理連接時 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void ConnMultiplexer_ConnectionRestored(object sender, ConnectionFailedEventArgs e) { Console.WriteLine($"{nameof(ConnMultiplexer_ConnectionRestored)}: {e.Exception}"); } #endregion 注冊事件 public async Task<bool> AcquireLockAsync(string lockValue) { var db = _connection.GetDatabase(); return await db.StringSetAsync(_lockKey, lockValue, _lockTimeout, when :When.NotExists); } public async Task ReleaseLockAsync(string lockValue) { var db = _connection.GetDatabase(); var value = await db.StringGetAsync(_lockKey); if (value == lockValue) { await db.KeyDeleteAsync(_lockKey); } } public async Task ExecuteWithLockAsync(Func<Task> action) { var lockValue = Guid.NewGuid().ToString(); // 唯一鎖標識 if (await AcquireLockAsync(lockValue)) { try { await action(); // 執(zhí)行需要加鎖的方法 } finally { await ReleaseLockAsync(lockValue); } } else { _logger.Error(new EventData() { Message = "無法獲取鎖,,另一個實例正在進行" }); } } } [Serializable] public class RedisModel { public String Key { get; set; } public String RequestId { get; set; } public DateTime CreateDate { get; set; } } } |
|
來自: 修行的嘟嘟 > 《軟件開發(fā)》