♻️ Refactored the cache service
This commit is contained in:
@@ -1,396 +1,201 @@
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using System.Text.Json.Serialization.Metadata;
|
||||
using DysonNetwork.Shared.Data;
|
||||
using NodaTime;
|
||||
using NodaTime.Serialization.SystemTextJson;
|
||||
using Microsoft.Extensions.Caching.Distributed;
|
||||
using RedLockNet;
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace DysonNetwork.Shared.Cache;
|
||||
|
||||
/// <summary>
|
||||
/// Represents a distributed lock that can be used to synchronize access across multiple processes
|
||||
/// </summary>
|
||||
public interface IDistributedLock : IAsyncDisposable
|
||||
public class CacheServiceRedis(
|
||||
IDistributedCache cache,
|
||||
IConnectionMultiplexer redis,
|
||||
ICacheSerializer serializer,
|
||||
IDistributedLockFactory lockFactory
|
||||
)
|
||||
: ICacheService
|
||||
{
|
||||
/// <summary>
|
||||
/// The resource identifier this lock is protecting
|
||||
/// </summary>
|
||||
string Resource { get; }
|
||||
private const string GlobalKeyPrefix = "dyson:";
|
||||
private const string GroupKeyPrefix = GlobalKeyPrefix + "cg:";
|
||||
private const string LockKeyPrefix = GlobalKeyPrefix + "lock:";
|
||||
|
||||
/// <summary>
|
||||
/// Unique identifier for this lock instance
|
||||
/// </summary>
|
||||
string LockId { get; }
|
||||
private static string Normalize(string key) => $"{GlobalKeyPrefix}{key}";
|
||||
|
||||
/// <summary>
|
||||
/// Extends the lock's expiration time
|
||||
/// </summary>
|
||||
Task<bool> ExtendAsync(TimeSpan timeSpan);
|
||||
|
||||
/// <summary>
|
||||
/// Releases the lock immediately
|
||||
/// </summary>
|
||||
Task ReleaseAsync();
|
||||
}
|
||||
|
||||
public interface ICacheService
|
||||
{
|
||||
/// <summary>
|
||||
/// Sets a value in the cache with an optional expiration time
|
||||
/// </summary>
|
||||
Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null);
|
||||
|
||||
/// <summary>
|
||||
/// Gets a value from the cache
|
||||
/// </summary>
|
||||
Task<T?> GetAsync<T>(string key);
|
||||
|
||||
/// <summary>
|
||||
/// Get a value from the cache with the found status
|
||||
/// </summary>
|
||||
Task<(bool found, T? value)> GetAsyncWithStatus<T>(string key);
|
||||
|
||||
/// <summary>
|
||||
/// Removes a specific key from the cache
|
||||
/// </summary>
|
||||
Task<bool> RemoveAsync(string key);
|
||||
|
||||
/// <summary>
|
||||
/// Adds a key to a group for group-based operations
|
||||
/// </summary>
|
||||
Task AddToGroupAsync(string key, string group);
|
||||
|
||||
/// <summary>
|
||||
/// Removes all keys associated with a specific group
|
||||
/// </summary>
|
||||
Task RemoveGroupAsync(string group);
|
||||
|
||||
/// <summary>
|
||||
/// Gets all keys belonging to a specific group
|
||||
/// </summary>
|
||||
Task<IEnumerable<string>> GetGroupKeysAsync(string group);
|
||||
|
||||
/// <summary>
|
||||
/// Helper method to set a value in cache and associate it with multiple groups in one operation
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of value being cached</typeparam>
|
||||
/// <param name="key">Cache key</param>
|
||||
/// <param name="value">The value to cache</param>
|
||||
/// <param name="groups">Optional collection of group names to associate the key with</param>
|
||||
/// <param name="expiry">Optional expiration time for the cached item</param>
|
||||
/// <returns>True if the set operation was successful</returns>
|
||||
Task<bool> SetWithGroupsAsync<T>(string key, T value, IEnumerable<string>? groups = null, TimeSpan? expiry = null);
|
||||
|
||||
/// <summary>
|
||||
/// Acquires a distributed lock on the specified resource
|
||||
/// </summary>
|
||||
/// <param name="resource">The resource identifier to lock</param>
|
||||
/// <param name="expiry">How long the lock should be held before automatically expiring</param>
|
||||
/// <param name="waitTime">How long to wait for the lock before giving up</param>
|
||||
/// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
|
||||
/// <returns>A distributed lock instance if acquired, null otherwise</returns>
|
||||
Task<IDistributedLock?> AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null,
|
||||
TimeSpan? retryInterval = null);
|
||||
|
||||
/// <summary>
|
||||
/// Executes an action with a distributed lock, ensuring the lock is properly released afterwards
|
||||
/// </summary>
|
||||
/// <param name="resource">The resource identifier to lock</param>
|
||||
/// <param name="action">The action to execute while holding the lock</param>
|
||||
/// <param name="expiry">How long the lock should be held before automatically expiring</param>
|
||||
/// <param name="waitTime">How long to wait for the lock before giving up</param>
|
||||
/// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
|
||||
/// <returns>True if the lock was acquired and the action was executed, false otherwise</returns>
|
||||
Task<bool> ExecuteWithLockAsync(string resource, Func<Task> action, TimeSpan expiry, TimeSpan? waitTime = null,
|
||||
TimeSpan? retryInterval = null);
|
||||
|
||||
/// <summary>
|
||||
/// Executes a function with a distributed lock, ensuring the lock is properly released afterwards
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The return type of the function</typeparam>
|
||||
/// <param name="resource">The resource identifier to lock</param>
|
||||
/// <param name="func">The function to execute while holding the lock</param>
|
||||
/// <param name="expiry">How long the lock should be held before automatically expiring</param>
|
||||
/// <param name="waitTime">How long to wait for the lock before giving up</param>
|
||||
/// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
|
||||
/// <returns>The result of the function if the lock was acquired, default(T) otherwise</returns>
|
||||
Task<(bool Acquired, T? Result)> ExecuteWithLockAsync<T>(string resource, Func<Task<T>> func, TimeSpan expiry,
|
||||
TimeSpan? waitTime = null, TimeSpan? retryInterval = null);
|
||||
}
|
||||
|
||||
public class RedisDistributedLock : IDistributedLock
|
||||
{
|
||||
private readonly IDatabase _database;
|
||||
private bool _disposed;
|
||||
|
||||
public string Resource { get; }
|
||||
public string LockId { get; }
|
||||
|
||||
internal RedisDistributedLock(IDatabase database, string resource, string lockId)
|
||||
{
|
||||
_database = database;
|
||||
Resource = resource;
|
||||
LockId = lockId;
|
||||
}
|
||||
|
||||
public async Task<bool> ExtendAsync(TimeSpan timeSpan)
|
||||
{
|
||||
if (_disposed)
|
||||
throw new ObjectDisposedException(nameof(RedisDistributedLock));
|
||||
|
||||
var script = @"
|
||||
if redis.call('get', KEYS[1]) == ARGV[1] then
|
||||
return redis.call('pexpire', KEYS[1], ARGV[2])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
";
|
||||
|
||||
var result = await _database.ScriptEvaluateAsync(
|
||||
script,
|
||||
[$"{CacheServiceRedis.LockKeyPrefix}{Resource}"],
|
||||
[LockId, (long)timeSpan.TotalMilliseconds]
|
||||
);
|
||||
|
||||
return (long)result! == 1;
|
||||
}
|
||||
|
||||
public async Task ReleaseAsync()
|
||||
{
|
||||
if (_disposed)
|
||||
return;
|
||||
|
||||
var script = @"
|
||||
if redis.call('get', KEYS[1]) == ARGV[1] then
|
||||
return redis.call('del', KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
";
|
||||
|
||||
await _database.ScriptEvaluateAsync(
|
||||
script,
|
||||
[$"{CacheServiceRedis.LockKeyPrefix}{Resource}"],
|
||||
[LockId]
|
||||
);
|
||||
|
||||
_disposed = true;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await ReleaseAsync();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
|
||||
public class CacheServiceRedis : ICacheService
|
||||
{
|
||||
private readonly IDatabase _database;
|
||||
private readonly JsonSerializerOptions _jsonOptions;
|
||||
|
||||
// Global prefix for all cache keys
|
||||
public const string GlobalKeyPrefix = "dyson:";
|
||||
|
||||
// Using prefixes for different types of keys
|
||||
public const string GroupKeyPrefix = GlobalKeyPrefix + "cg:";
|
||||
public const string LockKeyPrefix = GlobalKeyPrefix + "lock:";
|
||||
|
||||
public CacheServiceRedis(IConnectionMultiplexer redis)
|
||||
{
|
||||
var rds = redis ?? throw new ArgumentNullException(nameof(redis));
|
||||
_database = rds.GetDatabase();
|
||||
|
||||
// Configure System.Text.Json with proper NodaTime serialization
|
||||
_jsonOptions = new JsonSerializerOptions
|
||||
{
|
||||
TypeInfoResolver = new DefaultJsonTypeInfoResolver
|
||||
{
|
||||
Modifiers = { JsonExtensions.UnignoreAllProperties() },
|
||||
},
|
||||
ReferenceHandler = ReferenceHandler.Preserve,
|
||||
NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
|
||||
Converters = { new ByteStringConverter() }
|
||||
};
|
||||
_jsonOptions.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
|
||||
_jsonOptions.PropertyNameCaseInsensitive = true;
|
||||
}
|
||||
// -----------------------------------------------------
|
||||
// BASIC OPERATIONS
|
||||
// -----------------------------------------------------
|
||||
|
||||
public async Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null)
|
||||
{
|
||||
key = $"{GlobalKeyPrefix}{key}";
|
||||
if (string.IsNullOrEmpty(key))
|
||||
throw new ArgumentException("Key cannot be null or empty", nameof(key));
|
||||
key = Normalize(key);
|
||||
|
||||
var serializedValue = JsonSerializer.Serialize(value, _jsonOptions);
|
||||
return await _database.StringSetAsync(key, serializedValue, expiry);
|
||||
var json = serializer.Serialize(value);
|
||||
|
||||
var options = new DistributedCacheEntryOptions();
|
||||
if (expiry.HasValue)
|
||||
options.SetAbsoluteExpiration(expiry.Value);
|
||||
|
||||
await cache.SetStringAsync(key, json, options);
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task<T?> GetAsync<T>(string key)
|
||||
{
|
||||
key = $"{GlobalKeyPrefix}{key}";
|
||||
if (string.IsNullOrEmpty(key))
|
||||
throw new ArgumentException("Key cannot be null or empty", nameof(key));
|
||||
key = Normalize(key);
|
||||
|
||||
var value = await _database.StringGetAsync(key);
|
||||
var json = await cache.GetStringAsync(key);
|
||||
if (json is null)
|
||||
return default;
|
||||
|
||||
return value.IsNullOrEmpty ? default :
|
||||
// For NodaTime serialization, use the configured JSON options
|
||||
JsonSerializer.Deserialize<T>(value.ToString(), _jsonOptions);
|
||||
return serializer.Deserialize<T>(json);
|
||||
}
|
||||
|
||||
public async Task<(bool found, T? value)> GetAsyncWithStatus<T>(string key)
|
||||
{
|
||||
key = $"{GlobalKeyPrefix}{key}";
|
||||
if (string.IsNullOrEmpty(key))
|
||||
throw new ArgumentException("Key cannot be null or empty", nameof(key));
|
||||
key = Normalize(key);
|
||||
|
||||
var value = await _database.StringGetAsync(key);
|
||||
var json = await cache.GetStringAsync(key);
|
||||
if (json is null)
|
||||
return (false, default);
|
||||
|
||||
return value.IsNullOrEmpty ? (false, default) :
|
||||
// For NodaTime serialization, use the configured JSON options
|
||||
(true, JsonSerializer.Deserialize<T>(value!.ToString(), _jsonOptions));
|
||||
return (true, serializer.Deserialize<T>(json));
|
||||
}
|
||||
|
||||
public async Task<bool> RemoveAsync(string key)
|
||||
{
|
||||
key = $"{GlobalKeyPrefix}{key}";
|
||||
if (string.IsNullOrEmpty(key))
|
||||
throw new ArgumentException("Key cannot be null or empty", nameof(key));
|
||||
key = Normalize(key);
|
||||
|
||||
// Before removing the key, find all groups it belongs to and remove it from them
|
||||
var script = @"
|
||||
local groups = redis.call('KEYS', ARGV[1])
|
||||
for _, group in ipairs(groups) do
|
||||
redis.call('SREM', group, ARGV[2])
|
||||
end
|
||||
return redis.call('DEL', ARGV[2])
|
||||
";
|
||||
// Remove key from all groups
|
||||
var db = redis.GetDatabase();
|
||||
|
||||
var result = await _database.ScriptEvaluateAsync(
|
||||
script,
|
||||
values: [$"{GroupKeyPrefix}*", key]
|
||||
);
|
||||
var groupPattern = $"{GroupKeyPrefix}*";
|
||||
var server = redis.GetServers().First();
|
||||
|
||||
return (long)result! > 0;
|
||||
var groups = server.Keys(pattern: groupPattern);
|
||||
foreach (var group in groups)
|
||||
{
|
||||
await db.SetRemoveAsync(group, key);
|
||||
}
|
||||
|
||||
await cache.RemoveAsync(key);
|
||||
return true;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------
|
||||
// GROUP OPERATIONS
|
||||
// -----------------------------------------------------
|
||||
|
||||
public async Task AddToGroupAsync(string key, string group)
|
||||
{
|
||||
if (string.IsNullOrEmpty(key))
|
||||
throw new ArgumentException(@"Key cannot be null or empty.", nameof(key));
|
||||
|
||||
if (string.IsNullOrEmpty(group))
|
||||
throw new ArgumentException(@"Group cannot be null or empty.", nameof(group));
|
||||
|
||||
key = Normalize(key);
|
||||
var db = redis.GetDatabase();
|
||||
var groupKey = $"{GroupKeyPrefix}{group}";
|
||||
key = $"{GlobalKeyPrefix}{key}";
|
||||
await _database.SetAddAsync(groupKey, key);
|
||||
await db.SetAddAsync(groupKey, key);
|
||||
}
|
||||
|
||||
public async Task RemoveGroupAsync(string group)
|
||||
{
|
||||
if (string.IsNullOrEmpty(group))
|
||||
throw new ArgumentException(@"Group cannot be null or empty.", nameof(group));
|
||||
|
||||
var groupKey = $"{GroupKeyPrefix}{group}";
|
||||
var db = redis.GetDatabase();
|
||||
|
||||
// Get all keys in the group
|
||||
var keys = await _database.SetMembersAsync(groupKey);
|
||||
var keys = await db.SetMembersAsync(groupKey);
|
||||
|
||||
if (keys.Length > 0)
|
||||
{
|
||||
// Delete all the keys
|
||||
var keysTasks = keys.Select(key => _database.KeyDeleteAsync(key.ToString()));
|
||||
await Task.WhenAll(keysTasks);
|
||||
foreach (var key in keys)
|
||||
await cache.RemoveAsync(key.ToString());
|
||||
}
|
||||
|
||||
// Delete the group itself
|
||||
await _database.KeyDeleteAsync(groupKey);
|
||||
await db.KeyDeleteAsync(groupKey);
|
||||
}
|
||||
|
||||
public async Task<IEnumerable<string>> GetGroupKeysAsync(string group)
|
||||
{
|
||||
if (string.IsNullOrEmpty(group))
|
||||
throw new ArgumentException("Group cannot be null or empty.", nameof(group));
|
||||
|
||||
var groupKey = string.Concat(GroupKeyPrefix, group);
|
||||
var members = await _database.SetMembersAsync(groupKey);
|
||||
|
||||
return members.Select(m => m.ToString());
|
||||
var groupKey = $"{GroupKeyPrefix}{group}";
|
||||
var db = redis.GetDatabase();
|
||||
var members = await db.SetMembersAsync(groupKey);
|
||||
return members.Select(x => x.ToString());
|
||||
}
|
||||
|
||||
public async Task<bool> SetWithGroupsAsync<T>(string key, T value, IEnumerable<string>? groups = null,
|
||||
public async Task<bool> SetWithGroupsAsync<T>(
|
||||
string key,
|
||||
T value,
|
||||
IEnumerable<string>? groups = null,
|
||||
TimeSpan? expiry = null)
|
||||
{
|
||||
// First, set the value in the cache
|
||||
var setResult = await SetAsync(key, value, expiry);
|
||||
var result = await SetAsync(key, value, expiry);
|
||||
if (!result || groups == null)
|
||||
return result;
|
||||
|
||||
// If successful and there are groups to associate, add the key to each group
|
||||
if (!setResult || groups == null) return setResult;
|
||||
var groupsArray = groups.Where(g => !string.IsNullOrEmpty(g)).ToArray();
|
||||
if (groupsArray.Length <= 0) return setResult;
|
||||
var tasks = groupsArray.Select(group => AddToGroupAsync(key, group));
|
||||
var tasks = groups.Select(g => AddToGroupAsync(key, g));
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
return setResult;
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task<IDistributedLock?> AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null,
|
||||
// -----------------------------------------------------
|
||||
// DISTRIBUTED LOCK (RedLock wrapper)
|
||||
// -----------------------------------------------------
|
||||
|
||||
private readonly TimeSpan _defaultRetry = TimeSpan.FromMilliseconds(100);
|
||||
|
||||
public async Task<IDistributedLock?> AcquireLockAsync(
|
||||
string resource,
|
||||
TimeSpan expiry,
|
||||
TimeSpan? waitTime = null,
|
||||
TimeSpan? retryInterval = null)
|
||||
{
|
||||
if (string.IsNullOrEmpty(resource))
|
||||
throw new ArgumentException("Resource cannot be null or empty", nameof(resource));
|
||||
if (string.IsNullOrWhiteSpace(resource))
|
||||
throw new ArgumentException("Resource cannot be null", nameof(resource));
|
||||
|
||||
var lockKey = $"{LockKeyPrefix}{resource}";
|
||||
var lockId = Guid.NewGuid().ToString("N");
|
||||
var waitTimeSpan = waitTime ?? TimeSpan.Zero;
|
||||
var retryIntervalSpan = retryInterval ?? TimeSpan.FromMilliseconds(100);
|
||||
var redlock = await lockFactory.CreateLockAsync(
|
||||
lockKey,
|
||||
expiry,
|
||||
waitTime ?? TimeSpan.Zero,
|
||||
retryInterval ?? _defaultRetry
|
||||
);
|
||||
|
||||
var startTime = DateTime.UtcNow;
|
||||
var acquired = false;
|
||||
|
||||
// Try to acquire the lock, retry until waitTime is exceeded
|
||||
while (!acquired && (DateTime.UtcNow - startTime) < waitTimeSpan)
|
||||
{
|
||||
acquired = await _database.StringSetAsync(lockKey, lockId, expiry, When.NotExists);
|
||||
|
||||
if (!acquired)
|
||||
{
|
||||
await Task.Delay(retryIntervalSpan);
|
||||
}
|
||||
}
|
||||
|
||||
if (!acquired)
|
||||
{
|
||||
return null; // Could not acquire the lock within the wait time
|
||||
}
|
||||
|
||||
return new RedisDistributedLock(_database, resource, lockId);
|
||||
return !redlock.IsAcquired ? null : new RedLockAdapter(redlock, resource);
|
||||
}
|
||||
|
||||
public async Task<bool> ExecuteWithLockAsync(string resource, Func<Task> action, TimeSpan expiry,
|
||||
TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
|
||||
public async Task<bool> ExecuteWithLockAsync(
|
||||
string resource,
|
||||
Func<Task> action,
|
||||
TimeSpan expiry,
|
||||
TimeSpan? waitTime = null,
|
||||
TimeSpan? retryInterval = null)
|
||||
{
|
||||
await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
|
||||
|
||||
if (lockObj == null)
|
||||
return false; // Could not acquire the lock
|
||||
await using var l = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
|
||||
if (l is null)
|
||||
return false;
|
||||
|
||||
await action();
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task<(bool Acquired, T? Result)> ExecuteWithLockAsync<T>(string resource, Func<Task<T>> func,
|
||||
TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
|
||||
public async Task<(bool Acquired, T? Result)> ExecuteWithLockAsync<T>(
|
||||
string resource,
|
||||
Func<Task<T>> func,
|
||||
TimeSpan expiry,
|
||||
TimeSpan? waitTime = null,
|
||||
TimeSpan? retryInterval = null)
|
||||
{
|
||||
await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
|
||||
|
||||
if (lockObj == null)
|
||||
return (false, default); // Could not acquire the lock
|
||||
await using var l = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
|
||||
if (l is null)
|
||||
return (false, default);
|
||||
|
||||
var result = await func();
|
||||
return (true, result);
|
||||
}
|
||||
}
|
||||
|
||||
public class RedLockAdapter(IRedLock inner, string resource) : IDistributedLock
|
||||
{
|
||||
public string Resource { get; } = resource;
|
||||
public string LockId => inner.LockId;
|
||||
|
||||
public ValueTask ReleaseAsync() => inner.DisposeAsync();
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await inner.DisposeAsync();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user