Implementation of IDistributedCache with mariadb and dapper

cache

The code below is an implementation of the IDistributedCache interface with MariaDB/MySQL database.


public class DistributedMariaDbCache: IDistributedCache, IDisposable
{
    private readonly IConfiguration _configuration;
    private ILogger<DistributedMariaDbCache> _logger;
    
    private bool disposedValue;

    private CancellationTokenSource _cancellationTokenSource = new ();

    public DistributedMariaDbCache(
        IConfiguration configuration,
        ILogger<DistributedMariaDbCache> logger)
    {
        _configuration = configuration;
        _logger = logger;

        Task.Run(CleanCache, _cancellationTokenSource.Token);
    }

    private MySqlConnection GetConnection()
    {
        MySqlConnection connection = new(_configuration.GetConnectionString("connection"));
        return connection;
    }

    private async Task CleanCache()
    {
        CancellationToken token = _cancellationTokenSource.Token;

        while (!token.IsCancellationRequested)
        {
            using MySqlConnection connection = GetConnection();
            
            await connection.ExecuteAsync("DELETE FROM Cache WHERE AbsoluteExpiration < CURRENT_TIMESTAMP");

            await Task.Delay(5000, token);
        }
    }

    public byte[]? Get(string key)
    {
        _logger.LogDebug("Get: {key}", key);

        using MySqlConnection connection = GetConnection();
        byte[]? value = connection.QueryFirstOrDefault<byte[]>(
            "SELECT Value FROM Cache WHERE Id=@key AND AbsoluteExpiration > CURRENT_TIMESTAMP", new { key });
        
        return value;
    }

    public async Task<byte[]?> GetAsync(string key, CancellationToken token = default)
    {
        _logger.LogDebug("GetAsync: {key}", key);

        using MySqlConnection connection = GetConnection();
        byte[]? value = await connection.QueryFirstOrDefaultAsync<byte[]>(
            "SELECT Value FROM Cache WHERE Id=@key AND AbsoluteExpiration > CURRENT_TIMESTAMP", new { key });

        return value;
    }

    public void Refresh(string key)
    {
        _logger.LogDebug("Refresh, key: {key}", key);

        using MySqlConnection connection = GetConnection();

        int? slidingExpiration = connection.ExecuteScalar<int?>(
            "SELECT SlidingExpirationInSeconds FROM Cache WHERE Id=@key AND AbsoluteExpiration > CURRENT_TIMESTAMP",
            new { key });

        if (slidingExpiration != null && slidingExpiration > 0)
        {
            DateTimeOffset absoluteExpiration = DateTimeOffset.Now.AddSeconds(slidingExpiration.Value);

            _logger.LogDebug("RefreshAsync, new absolute expiry for key: {key} is {absoluteExpiration}",
                key, absoluteExpiration);

            connection.Execute("UPDATE Cache SET AbsoluteExpiration = @absoluteExpiration WHERE Id=@key",
                new { absoluteExpiration, key });
        }
    }

    public async Task RefreshAsync(string key, CancellationToken token = default)
    {
        _logger.LogDebug("RefreshAsync, key: {key}", key);

        using MySqlConnection connection = GetConnection();

        int? slidingExpiration = await connection.ExecuteScalarAsync<int?>(
            "SELECT SlidingExpirationInSeconds FROM Cache WHERE Id=@key AND AbsoluteExpiration > CURRENT_TIMESTAMP",
            new { key });

        if (slidingExpiration != null && slidingExpiration > 0) 
        {
            DateTimeOffset absoluteExpiration = DateTimeOffset.Now.AddSeconds(slidingExpiration.Value);

            _logger.LogDebug("RefreshAsync, new absolute expiry for key: {key} is {absoluteExpiration}", 
                key, absoluteExpiration);

            await connection.ExecuteAsync("UPDATE Cache SET AbsoluteExpiration = @absoluteExpiration WHERE Id=@key",
                new { absoluteExpiration, key });
        }
        
    }

    public void Remove(string key)
    {
        _logger.LogDebug("Remove for key: {key}", key);

        using MySqlConnection connection = GetConnection();
        connection.Execute("DELETE FROM Cache WHERE Id=@key", new { key });
    }

    public async Task RemoveAsync(string key, CancellationToken token = default)
    {
        _logger.LogDebug("RemoveAsync for key: {key}", key);

        using MySqlConnection connection = GetConnection();
        await connection.ExecuteAsync("DELETE FROM Cache WHERE Id=@key", new { key });
    }

    public void Set(string key, byte[] value, DistributedCacheEntryOptions options)
    {
        _logger.LogDebug("Set for key: {key}", key);

        DateTimeOffset? absoluteExpiration = options.AbsoluteExpiration;

        if (absoluteExpiration == null && options.AbsoluteExpirationRelativeToNow != null)
        {
            absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value);
        }
        else if (options.SlidingExpiration != null)
        {
            absoluteExpiration = DateTimeOffset.Now.Add(options.SlidingExpiration.Value);
        }
       

        using MySqlConnection connection = GetConnection();
        
        connection.Execute(@"
            INSERT INTO Cache(Id,Value,SlidingExpirationInSeconds,AbsoluteExpiration) 
            VALUES (@key,@value,@slidingExpirationInSeconds, @absoluteExpiration) 
            ON DUPLICATE KEY UPDATE 
                Value=@value,
                SlidingExpirationInSeconds=@slidingExpirationInSeconds,
                AbsoluteExpiration=@absoluteExpiration;", 
        new { 
            key, 
            value, 
            slidingExpirationInSeconds  = options.SlidingExpiration?.TotalSeconds,
            absoluteExpiration
        });
    }

    public async Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token = default)
    {
        _logger.LogDebug("SetAsync for key: {key}", key);

        DateTimeOffset? absoluteExpiration = options.AbsoluteExpiration;
        
        if (absoluteExpiration == null && options.AbsoluteExpirationRelativeToNow != null)
        {
            absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value);
        }
        else if (options.SlidingExpiration != null)
        {
            absoluteExpiration = DateTimeOffset.Now.Add(options.SlidingExpiration.Value);
        }

        using MySqlConnection connection = GetConnection();
        
        await connection.ExecuteAsync(@"
            INSERT INTO Cache(Id,Value,SlidingExpirationInSeconds,AbsoluteExpiration) 
            VALUES (@key,@value,@slidingExpirationInSeconds, @absoluteExpiration) 
            ON DUPLICATE KEY UPDATE 
                Value=@value,
                SlidingExpirationInSeconds=@slidingExpirationInSeconds,
                AbsoluteExpiration=@absoluteExpiration;",
        new
        {
            key,
            value,
            slidingExpirationInSeconds = options.SlidingExpiration?.TotalSeconds,
            absoluteExpiration
        });
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                _logger.LogDebug("Disposing...");

                _cancellationTokenSource.Cancel();
                _cancellationTokenSource.Dispose();
            }

            // TODO: free unmanaged resources (unmanaged objects) and override finalizer
            // TODO: set large fields to null
            disposedValue = true;
        }
    }

    // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
    // ~DistributedMariaDbCache()
    // {
    //     // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
    //     Dispose(disposing: false);
    // }

    public void Dispose()
    {
        // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }
}

Post a Comment

Previous Post Next Post