400 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			400 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using System.Text.Json;
 | |
| using System.Text.Json.Serialization;
 | |
| using System.Text.Json.Serialization.Metadata;
 | |
| using DysonNetwork.Shared.Data;
 | |
| using NodaTime;
 | |
| using NodaTime.Serialization.SystemTextJson;
 | |
| using StackExchange.Redis;
 | |
| 
 | |
| namespace DysonNetwork.Shared.Cache;
 | |
| 
 | |
| /// <summary>
 | |
| /// Represents a distributed lock that can be used to synchronize access across multiple processes
 | |
| /// </summary>
 | |
| public interface IDistributedLock : IAsyncDisposable
 | |
| {
 | |
|     /// <summary>
 | |
|     /// The resource identifier this lock is protecting
 | |
|     /// </summary>
 | |
|     string Resource { get; }
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Unique identifier for this lock instance
 | |
|     /// </summary>
 | |
|     string LockId { get; }
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Extends the lock's expiration time
 | |
|     /// </summary>
 | |
|     Task<bool> ExtendAsync(TimeSpan timeSpan);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Releases the lock immediately
 | |
|     /// </summary>
 | |
|     Task ReleaseAsync();
 | |
| }
 | |
| 
 | |
| public interface ICacheService
 | |
| {
 | |
|     /// <summary>
 | |
|     /// Sets a value in the cache with an optional expiration time
 | |
|     /// </summary>
 | |
|     Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Gets a value from the cache
 | |
|     /// </summary>
 | |
|     Task<T?> GetAsync<T>(string key);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Get a value from the cache with the found status
 | |
|     /// </summary>
 | |
|     Task<(bool found, T? value)> GetAsyncWithStatus<T>(string key);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Removes a specific key from the cache
 | |
|     /// </summary>
 | |
|     Task<bool> RemoveAsync(string key);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Adds a key to a group for group-based operations
 | |
|     /// </summary>
 | |
|     Task AddToGroupAsync(string key, string group);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Removes all keys associated with a specific group
 | |
|     /// </summary>
 | |
|     Task RemoveGroupAsync(string group);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Gets all keys belonging to a specific group
 | |
|     /// </summary>
 | |
|     Task<IEnumerable<string>> GetGroupKeysAsync(string group);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Helper method to set a value in cache and associate it with multiple groups in one operation
 | |
|     /// </summary>
 | |
|     /// <typeparam name="T">The type of value being cached</typeparam>
 | |
|     /// <param name="key">Cache key</param>
 | |
|     /// <param name="value">The value to cache</param>
 | |
|     /// <param name="groups">Optional collection of group names to associate the key with</param>
 | |
|     /// <param name="expiry">Optional expiration time for the cached item</param>
 | |
|     /// <returns>True if the set operation was successful</returns>
 | |
|     Task<bool> SetWithGroupsAsync<T>(string key, T value, IEnumerable<string>? groups = null, TimeSpan? expiry = null);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Acquires a distributed lock on the specified resource
 | |
|     /// </summary>
 | |
|     /// <param name="resource">The resource identifier to lock</param>
 | |
|     /// <param name="expiry">How long the lock should be held before automatically expiring</param>
 | |
|     /// <param name="waitTime">How long to wait for the lock before giving up</param>
 | |
|     /// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
 | |
|     /// <returns>A distributed lock instance if acquired, null otherwise</returns>
 | |
|     Task<IDistributedLock?> AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null,
 | |
|         TimeSpan? retryInterval = null);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Executes an action with a distributed lock, ensuring the lock is properly released afterwards
 | |
|     /// </summary>
 | |
|     /// <param name="resource">The resource identifier to lock</param>
 | |
|     /// <param name="action">The action to execute while holding the lock</param>
 | |
|     /// <param name="expiry">How long the lock should be held before automatically expiring</param>
 | |
|     /// <param name="waitTime">How long to wait for the lock before giving up</param>
 | |
|     /// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
 | |
|     /// <returns>True if the lock was acquired and the action was executed, false otherwise</returns>
 | |
|     Task<bool> ExecuteWithLockAsync(string resource, Func<Task> action, TimeSpan expiry, TimeSpan? waitTime = null,
 | |
|         TimeSpan? retryInterval = null);
 | |
| 
 | |
|     /// <summary>
 | |
|     /// Executes a function with a distributed lock, ensuring the lock is properly released afterwards
 | |
|     /// </summary>
 | |
|     /// <typeparam name="T">The return type of the function</typeparam>
 | |
|     /// <param name="resource">The resource identifier to lock</param>
 | |
|     /// <param name="func">The function to execute while holding the lock</param>
 | |
|     /// <param name="expiry">How long the lock should be held before automatically expiring</param>
 | |
|     /// <param name="waitTime">How long to wait for the lock before giving up</param>
 | |
|     /// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
 | |
|     /// <returns>The result of the function if the lock was acquired, default(T) otherwise</returns>
 | |
|     Task<(bool Acquired, T? Result)> ExecuteWithLockAsync<T>(string resource, Func<Task<T>> func, TimeSpan expiry,
 | |
|         TimeSpan? waitTime = null, TimeSpan? retryInterval = null);
 | |
| }
 | |
| 
 | |
| public class RedisDistributedLock : IDistributedLock
 | |
| {
 | |
|     private readonly IDatabase _database;
 | |
|     private bool _disposed;
 | |
| 
 | |
|     public string Resource { get; }
 | |
|     public string LockId { get; }
 | |
| 
 | |
|     internal RedisDistributedLock(IDatabase database, string resource, string lockId)
 | |
|     {
 | |
|         _database = database;
 | |
|         Resource = resource;
 | |
|         LockId = lockId;
 | |
|     }
 | |
| 
 | |
|     public async Task<bool> ExtendAsync(TimeSpan timeSpan)
 | |
|     {
 | |
|         if (_disposed)
 | |
|             throw new ObjectDisposedException(nameof(RedisDistributedLock));
 | |
| 
 | |
|         var script = @"
 | |
|             if redis.call('get', KEYS[1]) == ARGV[1] then
 | |
|                 return redis.call('pexpire', KEYS[1], ARGV[2])
 | |
|             else
 | |
|                 return 0
 | |
|             end
 | |
|         ";
 | |
| 
 | |
|         var result = await _database.ScriptEvaluateAsync(
 | |
|             script,
 | |
|             [$"{CacheServiceRedis.LockKeyPrefix}{Resource}"],
 | |
|             [LockId, (long)timeSpan.TotalMilliseconds]
 | |
|         );
 | |
| 
 | |
|         return (long)result! == 1;
 | |
|     }
 | |
| 
 | |
|     public async Task ReleaseAsync()
 | |
|     {
 | |
|         if (_disposed)
 | |
|             return;
 | |
| 
 | |
|         var script = @"
 | |
|             if redis.call('get', KEYS[1]) == ARGV[1] then
 | |
|                 return redis.call('del', KEYS[1])
 | |
|             else
 | |
|                 return 0
 | |
|             end
 | |
|         ";
 | |
| 
 | |
|         await _database.ScriptEvaluateAsync(
 | |
|             script,
 | |
|             [$"{CacheServiceRedis.LockKeyPrefix}{Resource}"],
 | |
|             [LockId]
 | |
|         );
 | |
| 
 | |
|         _disposed = true;
 | |
|     }
 | |
| 
 | |
|     public async ValueTask DisposeAsync()
 | |
|     {
 | |
|         await ReleaseAsync();
 | |
|         GC.SuppressFinalize(this);
 | |
|     }
 | |
| }
 | |
| 
 | |
| public class CacheServiceRedis : ICacheService
 | |
| {
 | |
|     private readonly IDatabase _database;
 | |
|     private readonly JsonSerializerOptions _jsonOptions;
 | |
| 
 | |
|     // Global prefix for all cache keys
 | |
|     public const string GlobalKeyPrefix = "dyson:";
 | |
| 
 | |
|     // Using prefixes for different types of keys
 | |
|     public const string GroupKeyPrefix = GlobalKeyPrefix + "cg:";
 | |
|     public const string LockKeyPrefix = GlobalKeyPrefix + "lock:";
 | |
| 
 | |
|     public CacheServiceRedis(IConnectionMultiplexer redis)
 | |
|     {
 | |
|         var rds = redis ?? throw new ArgumentNullException(nameof(redis));
 | |
|         _database = rds.GetDatabase();
 | |
| 
 | |
|         // Configure System.Text.Json with proper NodaTime serialization
 | |
|         _jsonOptions = new JsonSerializerOptions
 | |
|         {
 | |
|             TypeInfoResolver = new DefaultJsonTypeInfoResolver
 | |
|             {
 | |
|                 Modifiers = { JsonExtensions.UnignoreAllProperties() },
 | |
|             },
 | |
|             ReferenceHandler = ReferenceHandler.Preserve,
 | |
|             NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
 | |
|         };
 | |
|         _jsonOptions.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
 | |
|         _jsonOptions.PropertyNameCaseInsensitive = true;
 | |
|     }
 | |
| 
 | |
|     public async Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null)
 | |
|     {
 | |
|         key = $"{GlobalKeyPrefix}{key}";
 | |
|         if (string.IsNullOrEmpty(key))
 | |
|             throw new ArgumentException("Key cannot be null or empty", nameof(key));
 | |
| 
 | |
|         var serializedValue = JsonSerializer.Serialize(value, _jsonOptions);
 | |
|         return await _database.StringSetAsync(key, serializedValue, expiry);
 | |
|     }
 | |
| 
 | |
|     public async Task<T?> GetAsync<T>(string key)
 | |
|     {
 | |
|         key = $"{GlobalKeyPrefix}{key}";
 | |
|         if (string.IsNullOrEmpty(key))
 | |
|             throw new ArgumentException("Key cannot be null or empty", nameof(key));
 | |
| 
 | |
|         var value = await _database.StringGetAsync(key);
 | |
| 
 | |
|         if (value.IsNullOrEmpty)
 | |
|             return default;
 | |
| 
 | |
|         // For NodaTime serialization, use the configured JSON options
 | |
|         return JsonSerializer.Deserialize<T>(value!, _jsonOptions);
 | |
|     }
 | |
| 
 | |
|     public async Task<(bool found, T? value)> GetAsyncWithStatus<T>(string key)
 | |
|     {
 | |
|         key = $"{GlobalKeyPrefix}{key}";
 | |
|         if (string.IsNullOrEmpty(key))
 | |
|             throw new ArgumentException("Key cannot be null or empty", nameof(key));
 | |
| 
 | |
|         var value = await _database.StringGetAsync(key);
 | |
| 
 | |
|         if (value.IsNullOrEmpty)
 | |
|             return (false, default);
 | |
| 
 | |
|         // For NodaTime serialization, use the configured JSON options
 | |
|         return (true, JsonSerializer.Deserialize<T>(value!, _jsonOptions));
 | |
|     }
 | |
| 
 | |
|     public async Task<bool> RemoveAsync(string key)
 | |
|     {
 | |
|         key = $"{GlobalKeyPrefix}{key}";
 | |
|         if (string.IsNullOrEmpty(key))
 | |
|             throw new ArgumentException("Key cannot be null or empty", nameof(key));
 | |
| 
 | |
|         // Before removing the key, find all groups it belongs to and remove it from them
 | |
|         var script = @"
 | |
|             local groups = redis.call('KEYS', ARGV[1])
 | |
|             for _, group in ipairs(groups) do
 | |
|                 redis.call('SREM', group, ARGV[2])
 | |
|             end
 | |
|             return redis.call('DEL', ARGV[2])
 | |
|         ";
 | |
| 
 | |
|         var result = await _database.ScriptEvaluateAsync(
 | |
|             script,
 | |
|             values: [$"{GroupKeyPrefix}*", key]
 | |
|         );
 | |
| 
 | |
|         return (long)result! > 0;
 | |
|     }
 | |
| 
 | |
|     public async Task AddToGroupAsync(string key, string group)
 | |
|     {
 | |
|         if (string.IsNullOrEmpty(key))
 | |
|             throw new ArgumentException(@"Key cannot be null or empty.", nameof(key));
 | |
| 
 | |
|         if (string.IsNullOrEmpty(group))
 | |
|             throw new ArgumentException(@"Group cannot be null or empty.", nameof(group));
 | |
| 
 | |
|         var groupKey = $"{GroupKeyPrefix}{group}";
 | |
|         key = $"{GlobalKeyPrefix}{key}";
 | |
|         await _database.SetAddAsync(groupKey, key);
 | |
|     }
 | |
| 
 | |
|     public async Task RemoveGroupAsync(string group)
 | |
|     {
 | |
|         if (string.IsNullOrEmpty(group))
 | |
|             throw new ArgumentException(@"Group cannot be null or empty.", nameof(group));
 | |
| 
 | |
|         var groupKey = $"{GroupKeyPrefix}{group}";
 | |
| 
 | |
|         // Get all keys in the group
 | |
|         var keys = await _database.SetMembersAsync(groupKey);
 | |
| 
 | |
|         if (keys.Length > 0)
 | |
|         {
 | |
|             // Delete all the keys
 | |
|             var keysTasks = keys.Select(key => _database.KeyDeleteAsync(key.ToString()));
 | |
|             await Task.WhenAll(keysTasks);
 | |
|         }
 | |
| 
 | |
|         // Delete the group itself
 | |
|         await _database.KeyDeleteAsync(groupKey);
 | |
|     }
 | |
| 
 | |
|     public async Task<IEnumerable<string>> GetGroupKeysAsync(string group)
 | |
|     {
 | |
|         if (string.IsNullOrEmpty(group))
 | |
|             throw new ArgumentException("Group cannot be null or empty.", nameof(group));
 | |
| 
 | |
|         var groupKey = string.Concat(GroupKeyPrefix, group);
 | |
|         var members = await _database.SetMembersAsync(groupKey);
 | |
| 
 | |
|         return members.Select(m => m.ToString());
 | |
|     }
 | |
| 
 | |
|     public async Task<bool> SetWithGroupsAsync<T>(string key, T value, IEnumerable<string>? groups = null,
 | |
|         TimeSpan? expiry = null)
 | |
|     {
 | |
|         // First, set the value in the cache
 | |
|         var setResult = await SetAsync(key, value, expiry);
 | |
| 
 | |
|         // If successful and there are groups to associate, add the key to each group
 | |
|         if (!setResult || groups == null) return setResult;
 | |
|         var groupsArray = groups.Where(g => !string.IsNullOrEmpty(g)).ToArray();
 | |
|         if (groupsArray.Length <= 0) return setResult;
 | |
|         var tasks = groupsArray.Select(group => AddToGroupAsync(key, group));
 | |
|         await Task.WhenAll(tasks);
 | |
| 
 | |
|         return setResult;
 | |
|     }
 | |
| 
 | |
|     public async Task<IDistributedLock?> AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null,
 | |
|         TimeSpan? retryInterval = null)
 | |
|     {
 | |
|         if (string.IsNullOrEmpty(resource))
 | |
|             throw new ArgumentException("Resource cannot be null or empty", nameof(resource));
 | |
| 
 | |
|         var lockKey = $"{LockKeyPrefix}{resource}";
 | |
|         var lockId = Guid.NewGuid().ToString("N");
 | |
|         var waitTimeSpan = waitTime ?? TimeSpan.Zero;
 | |
|         var retryIntervalSpan = retryInterval ?? TimeSpan.FromMilliseconds(100);
 | |
| 
 | |
|         var startTime = DateTime.UtcNow;
 | |
|         var acquired = false;
 | |
| 
 | |
|         // Try to acquire the lock, retry until waitTime is exceeded
 | |
|         while (!acquired && (DateTime.UtcNow - startTime) < waitTimeSpan)
 | |
|         {
 | |
|             acquired = await _database.StringSetAsync(lockKey, lockId, expiry, When.NotExists);
 | |
| 
 | |
|             if (!acquired)
 | |
|             {
 | |
|                 await Task.Delay(retryIntervalSpan);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         if (!acquired)
 | |
|         {
 | |
|             return null; // Could not acquire the lock within the wait time
 | |
|         }
 | |
| 
 | |
|         return new RedisDistributedLock(_database, resource, lockId);
 | |
|     }
 | |
| 
 | |
|     public async Task<bool> ExecuteWithLockAsync(string resource, Func<Task> action, TimeSpan expiry,
 | |
|         TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
 | |
|     {
 | |
|         await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
 | |
| 
 | |
|         if (lockObj == null)
 | |
|             return false; // Could not acquire the lock
 | |
| 
 | |
|         await action();
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     public async Task<(bool Acquired, T? Result)> ExecuteWithLockAsync<T>(string resource, Func<Task<T>> func,
 | |
|         TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
 | |
|     {
 | |
|         await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
 | |
| 
 | |
|         if (lockObj == null)
 | |
|             return (false, default); // Could not acquire the lock
 | |
| 
 | |
|         var result = await func();
 | |
|         return (true, result);
 | |
|     }
 | |
| }
 |