using Newtonsoft.Json; using Newtonsoft.Json.Serialization; using NodaTime; using NodaTime.Serialization.JsonNet; using StackExchange.Redis; namespace DysonNetwork.Sphere.Storage; /// /// Represents a distributed lock that can be used to synchronize access across multiple processes /// public interface IDistributedLock : IAsyncDisposable { /// /// The resource identifier this lock is protecting /// string Resource { get; } /// /// Unique identifier for this lock instance /// string LockId { get; } /// /// Extends the lock's expiration time /// Task ExtendAsync(TimeSpan timeSpan); /// /// Releases the lock immediately /// Task ReleaseAsync(); } public interface ICacheService { /// /// Sets a value in the cache with an optional expiration time /// Task SetAsync(string key, T value, TimeSpan? expiry = null); /// /// Gets a value from the cache /// Task GetAsync(string key); /// /// Removes a specific key from the cache /// Task RemoveAsync(string key); /// /// Adds a key to a group for group-based operations /// Task AddToGroupAsync(string key, string group); /// /// Removes all keys associated with a specific group /// Task RemoveGroupAsync(string group); /// /// Gets all keys belonging to a specific group /// Task> GetGroupKeysAsync(string group); /// /// Helper method to set a value in cache and associate it with multiple groups in one operation /// /// The type of value being cached /// Cache key /// The value to cache /// Optional collection of group names to associate the key with /// Optional expiration time for the cached item /// True if the set operation was successful Task SetWithGroupsAsync(string key, T value, IEnumerable? groups = null, TimeSpan? expiry = null); /// /// Acquires a distributed lock on the specified resource /// /// The resource identifier to lock /// How long the lock should be held before automatically expiring /// How long to wait for the lock before giving up /// How often to retry acquiring the lock during the wait time /// A distributed lock instance if acquired, null otherwise Task AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null); /// /// Executes an action with a distributed lock, ensuring the lock is properly released afterwards /// /// The resource identifier to lock /// The action to execute while holding the lock /// How long the lock should be held before automatically expiring /// How long to wait for the lock before giving up /// How often to retry acquiring the lock during the wait time /// True if the lock was acquired and the action was executed, false otherwise Task ExecuteWithLockAsync(string resource, Func action, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null); /// /// Executes a function with a distributed lock, ensuring the lock is properly released afterwards /// /// The return type of the function /// The resource identifier to lock /// The function to execute while holding the lock /// How long the lock should be held before automatically expiring /// How long to wait for the lock before giving up /// How often to retry acquiring the lock during the wait time /// The result of the function if the lock was acquired, default(T) otherwise Task<(bool Acquired, T? Result)> ExecuteWithLockAsync(string resource, Func> func, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null); } public class RedisDistributedLock : IDistributedLock { private readonly IDatabase _database; private bool _disposed; private const string LockKeyPrefix = "Lock_"; public string Resource { get; } public string LockId { get; } internal RedisDistributedLock(IDatabase database, string resource, string lockId) { _database = database; Resource = resource; LockId = lockId; } public async Task ExtendAsync(TimeSpan timeSpan) { if (_disposed) throw new ObjectDisposedException(nameof(RedisDistributedLock)); var script = @" if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end "; var result = await _database.ScriptEvaluateAsync( script, [$"{LockKeyPrefix}{Resource}"], [LockId, (long)timeSpan.TotalMilliseconds] ); return (long)result! == 1; } public async Task ReleaseAsync() { if (_disposed) return; var script = @" if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end "; await _database.ScriptEvaluateAsync( script, [$"{LockKeyPrefix}{Resource}"], [LockId] ); _disposed = true; } public async ValueTask DisposeAsync() { await ReleaseAsync(); GC.SuppressFinalize(this); } } public class CacheServiceRedis : ICacheService { private readonly IDatabase _database; private readonly JsonSerializerSettings _serializerSettings; // Global prefix for all cache keys private const string GlobalKeyPrefix = "dyson:"; // Using prefixes for different types of keys private const string GroupKeyPrefix = GlobalKeyPrefix + "cg:"; private const string LockKeyPrefix = GlobalKeyPrefix + "lock:"; public CacheServiceRedis(IConnectionMultiplexer redis) { var rds = redis ?? throw new ArgumentNullException(nameof(redis)); _database = rds.GetDatabase(); _serializerSettings = new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver(), PreserveReferencesHandling = PreserveReferencesHandling.Objects, NullValueHandling = NullValueHandling.Include, }.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb); } public async Task SetAsync(string key, T value, TimeSpan? expiry = null) { if (string.IsNullOrEmpty(key)) throw new ArgumentException("Key cannot be null or empty", nameof(key)); var serializedValue = JsonConvert.SerializeObject(value, _serializerSettings); return await _database.StringSetAsync(key, serializedValue, expiry); } public async Task GetAsync(string key) { if (string.IsNullOrEmpty(key)) throw new ArgumentException("Key cannot be null or empty", nameof(key)); var value = await _database.StringGetAsync(key); if (value.IsNullOrEmpty) return default; return JsonConvert.DeserializeObject(value!, _serializerSettings); } public async Task RemoveAsync(string key) { if (string.IsNullOrEmpty(key)) throw new ArgumentException("Key cannot be null or empty", nameof(key)); // Before removing the key, find all groups it belongs to and remove it from them var script = @" local groups = redis.call('KEYS', ARGV[1]) for _, group in ipairs(groups) do redis.call('SREM', group, ARGV[2]) end return redis.call('DEL', ARGV[2]) "; var result = await _database.ScriptEvaluateAsync( script, values: [$"{GroupKeyPrefix}*", key] ); return (long)result! > 0; } public async Task AddToGroupAsync(string key, string group) { if (string.IsNullOrEmpty(key)) throw new ArgumentException(@"Key cannot be null or empty.", nameof(key)); if (string.IsNullOrEmpty(group)) throw new ArgumentException(@"Group cannot be null or empty.", nameof(group)); var groupKey = $"{GroupKeyPrefix}{group}"; await _database.SetAddAsync(groupKey, key); } public async Task RemoveGroupAsync(string group) { if (string.IsNullOrEmpty(group)) throw new ArgumentException(@"Group cannot be null or empty.", nameof(group)); var groupKey = $"{GroupKeyPrefix}{group}"; // Get all keys in the group var keys = await _database.SetMembersAsync(groupKey); if (keys.Length > 0) { // Delete all the keys var keysTasks = keys.Select(key => _database.KeyDeleteAsync(key.ToString())); await Task.WhenAll(keysTasks); } // Delete the group itself await _database.KeyDeleteAsync(groupKey); } public async Task> GetGroupKeysAsync(string group) { if (string.IsNullOrEmpty(group)) throw new ArgumentException(@"Group cannot be null or empty.", nameof(group)); var groupKey = $"{GroupKeyPrefix}{group}"; var members = await _database.SetMembersAsync(groupKey); return members.Select(m => m.ToString()); } public async Task SetWithGroupsAsync(string key, T value, IEnumerable? groups = null, TimeSpan? expiry = null) { // First, set the value in the cache var setResult = await SetAsync(key, value, expiry); // If successful and there are groups to associate, add the key to each group if (!setResult || groups == null) return setResult; var groupsArray = groups.Where(g => !string.IsNullOrEmpty(g)).ToArray(); if (groupsArray.Length <= 0) return setResult; var tasks = groupsArray.Select(group => AddToGroupAsync(key, group)); await Task.WhenAll(tasks); return setResult; } public async Task AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null) { if (string.IsNullOrEmpty(resource)) throw new ArgumentException("Resource cannot be null or empty", nameof(resource)); var lockKey = $"{LockKeyPrefix}{resource}"; var lockId = Guid.NewGuid().ToString("N"); var waitTimeSpan = waitTime ?? TimeSpan.Zero; var retryIntervalSpan = retryInterval ?? TimeSpan.FromMilliseconds(100); var startTime = DateTime.UtcNow; var acquired = false; // Try to acquire the lock, retry until waitTime is exceeded while (!acquired && (DateTime.UtcNow - startTime) < waitTimeSpan) { acquired = await _database.StringSetAsync(lockKey, lockId, expiry, When.NotExists); if (!acquired) { await Task.Delay(retryIntervalSpan); } } if (!acquired) { return null; // Could not acquire the lock within the wait time } return new RedisDistributedLock(_database, resource, lockId); } public async Task ExecuteWithLockAsync(string resource, Func action, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null) { await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval); if (lockObj == null) return false; // Could not acquire the lock await action(); return true; } public async Task<(bool Acquired, T? Result)> ExecuteWithLockAsync(string resource, Func> func, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null) { await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval); if (lockObj == null) return (false, default); // Could not acquire the lock var result = await func(); return (true, result); } }