Leveraging Async Redis for Caching in Python

by John | March 23, 2025

 

 

When working with web applications and external APIs, we’re very often using async code — it lets us send and receive multiple requests at the same time without blocking. Just as often, we’re sharing state between different parts of our app, which is where things like caching come in.

 

In this article, we’ll show how to create a Redis cache using async code, and how to set values with an expiry time in seconds. Since Redis stores everything in memory, we’ll also look at how to compress and decompress larger responses to save space. As a bonus, we’ll show how to run the deserialization step in an async-friendly way — keeping the whole flow non-blocking from start to finish.

 

This task is best explained with a mini-project. So here we will give a real example of a time where this method came in very handy. 

 

To follow along with this post you should have redis, aiohttp, msgpack installed in your virtual environment. 

 

Setting Up the Problem to Solve

 

To illustrate when and why we might want to use caching, we will show a real example for requesting exchange info from bybit, this is something we probably don't want to call very often, but additionally it has many different use cases, all with different tolerances for how fresh the data needs to be. 

 

Notice in the script below, that this response is often paginated, meaning we have multiple requests to make each time we want the exchange information

 

import asyncio
from typing import List

import aiohttp


async def get_exchange_info(category: str = "linear") -> List[dict]:
    url = "https://api.bybit.com/v5/market/instruments-info"
    params = {"category": category, "limit": 1000}
    tickers = []

    async with aiohttp.ClientSession() as session:
        while True:
            async with session.get(url, params=params) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    result = data.get("result", {})
                    new_tickers = result.get("list", [])
                    tickers.extend(new_tickers)

                    # Handle pagination
                    next_cursor = result.get("nextPageCursor")
                    if next_cursor:
                        params["cursor"] = next_cursor
                    else:
                        break
                else:
                    break

    return tickers


if __name__ == "__main__":

    tickers = asyncio.run(get_exchange_info())

    print(tickers)

 

The print out below shows, a fraction of that actual data retrieved from this API call, at present there are in excess of 500 symbols each with a range of columns to parse. 

 

 

 

So the problem here is that we don't want to be spending a lot of time making this API call unless we absolutely have to. Say for example we just want the symbols for Bybit USDT futures contracts, we know these change very infrequently i.e. new listings and delistings so we probably don't want to be calling this fresh each time. On the other hand, the funding interval part, we probably want to be calling that at least once at hour as this is dynamic information that is important to us. 

 

 

Creating the Cache Object

 

First we will create a helper function that compresses data, this is very useful for working with Redis and my personal preference is msgpack along with zlib, this achieves high compression levels and doesn't add too much overhead, notice below we run the decompression in a thread so as not to block the event loop. 

 

import asyncio
import json
import zlib
from typing import Any, Optional

import msgpack
import redis.asyncio as redis


async def deserialize_data_async(compressed_data) -> list:
    if compressed_data is None:
        return []

    loop = asyncio.get_running_loop()

    def _deserialize():
        try:
            serialized_data = zlib.decompress(compressed_data)
            data = msgpack.unpackb(serialized_data, raw=False)
            return data
        except (
            zlib.error,
            msgpack.ExtraData,
            msgpack.FormatError,
            msgpack.StackError,
        ) as e:
            print(f"Error decoding = {e}")
            return []

    return await loop.run_in_executor(None, _deserialize)

 

 

Below is the cache object we will use to set temporary keys in to redis, notice we use the setex() method to set a key with an expiry which is similar to the hget() method we covered previously. 

 

class AsyncRedisCache:
    def __init__(self, redis_url: str):
        self.client = redis.from_url(redis_url)

    async def get(self, key: str, compressed: bool = False) -> Optional[Any]:
        data = await self.client.get(key)
        if not data:
            return None

        if compressed:
            return await deserialize_data_async(data)

        try:
            return json.loads(data)
        except json.JSONDecodeError as e:
            print(f"JSON decode error: {e}")
            return None

    async def set(self, key: str, values: Any, ttl: int, compress: bool = False):
        """
        Set key with expiry in seconds

        Args:
            key (str): Redis key name
            values (Any): Value to store (can be list, dict, etc.)
            ttl (int): Time-to-live in seconds
            compress (bool): Whether to compress using msgpack + zlib
        """
        if compress:
            packed = msgpack.packb(values, use_bin_type=True)
            values = zlib.compress(packed)
        else:
            values = json.dumps(values)

        await self.client.setex(key, ttl, values)

 

 

After that we can rewrite our API method to handle caching. We will add another method in order to get Bybit Symbols which we only want to update once a day. 

 

def get_redis_url() -> str:
    password = os.getenv("REDIS_PASSWORD", "")
    return f"redis://:{password}@localhost:6379/0"


def get_redis_cache() -> AsyncRedisCache:
    return AsyncRedisCache(get_redis_url())


async def get_exchange_info(
    category: str = "linear", ttl: int = 60, compress: bool = True
) -> List[dict]:
    cache = get_redis_cache()

    res = await cache.get(key=f"bybitInfo:{category}", compressed=compress)
    if res:
        print("Returning values from cache")
        return res

    url = "https://api.bybit.com/v5/market/instruments-info"
    params = {"category": category, "limit": 1000}
    tickers = []

    async with aiohttp.ClientSession() as session:
        while True:
            async with session.get(url, params=params) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    result = data.get("result", {})
                    tickers.extend(result.get("list", []))

                    if result.get("nextPageCursor"):
                        params["cursor"] = result["nextPageCursor"]
                    else:
                        break
                else:
                    break

    await cache.set(
        key=f"bybitInfo:{category}", values=tickers, ttl=ttl, compress=compress
    )
    return tickers


async def get_available_symbols(category: str, ttl: int = 60 * 60 * 24) -> list:
    cache = get_redis_cache()

    res = await cache.get(key=f"bybitSymbols:{category}")
    if res:
        print("Returning symbol values from cache")
        return res

    info = await get_exchange_info(category=category)
    symbols = [x["symbol"] for x in info]

    if symbols:
        await cache.set(
            key=f"bybitSymbols:{category}", values=symbols, ttl=ttl, compress=False
        )

    return symbols

 

Ok great, if we run this , the first call for get Bybit linear symbols will make a call to the API, the next time, it will return from the cache. 

 

if __name__ == "__main__":

    s = asyncio.run(get_available_symbols(category="linear"))

 

If we run this twice we will see the output below after the first call. 

 

Returning symbol values from cache

 

 

 

Full script

 

import asyncio
import json
import os
import zlib
from typing import Any, List, Optional

import aiohttp
import msgpack
import redis.asyncio as redis


async def deserialize_data_async(compressed_data) -> list:
    if compressed_data is None:
        return []

    loop = asyncio.get_running_loop()

    def _deserialize():
        try:
            serialized_data = zlib.decompress(compressed_data)
            data = msgpack.unpackb(serialized_data, raw=False)
            return data
        except (
            zlib.error,
            msgpack.ExtraData,
            msgpack.FormatError,
            msgpack.StackError,
        ) as e:
            print(f"Error decoding = {e}")
            return []

    return await loop.run_in_executor(None, _deserialize)


class AsyncRedisCache:
    def __init__(self, redis_url: str):
        self.client = redis.from_url(redis_url)

    async def get(self, key: str, compressed: bool = False) -> Optional[Any]:
        data = await self.client.get(key)
        if not data:
            return None

        if compressed:
            return await deserialize_data_async(data)

        try:
            return json.loads(data)
        except json.JSONDecodeError as e:
            print(f"JSON decode error: {e}")
            return None

    async def set(self, key: str, values: Any, ttl: int, compress: bool = False):
        """
        Set key with expiry in seconds

        Args:
            key (str): Redis key name
            values (Any): Value to store (can be list, dict, etc.)
            ttl (int): Time-to-live in seconds
            compress (bool): Whether to compress using msgpack + zlib
        """
        if compress:
            packed = msgpack.packb(values, use_bin_type=True)
            values = zlib.compress(packed)
        else:
            values = json.dumps(values)

        await self.client.setex(key, ttl, values)


def get_redis_url() -> str:
    password = os.getenv("REDIS_PASSWORD", "")
    return f"redis://:{password}@localhost:6379/0"


def get_redis_cache() -> AsyncRedisCache:
    return AsyncRedisCache(get_redis_url())


async def get_exchange_info(
    category: str = "linear", ttl: int = 60, compress: bool = True
) -> List[dict]:
    cache = get_redis_cache()

    res = await cache.get(key=f"bybitInfo:{category}", compressed=compress)
    if res:
        print("Returning values from cache")
        return res

    url = "https://api.bybit.com/v5/market/instruments-info"
    params = {"category": category, "limit": 1000}
    tickers = []

    async with aiohttp.ClientSession() as session:
        while True:
            async with session.get(url, params=params) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    result = data.get("result", {})
                    tickers.extend(result.get("list", []))

                    if result.get("nextPageCursor"):
                        params["cursor"] = result["nextPageCursor"]
                    else:
                        break
                else:
                    break

    await cache.set(
        key=f"bybitInfo:{category}", values=tickers, ttl=ttl, compress=compress
    )
    return tickers


async def get_available_symbols(category: str, ttl: int = 60 * 60 * 24) -> list:
    cache = get_redis_cache()

    res = await cache.get(key=f"bybitSymbols:{category}")
    if res:
        print("Returning symbol values from cache")
        return res

    info = await get_exchange_info(category=category)
    symbols = [x["symbol"] for x in info]

    if symbols:
        await cache.set(
            key=f"bybitSymbols:{category}", values=symbols, ttl=ttl, compress=False
        )

    return symbols


if __name__ == "__main__":

    s = asyncio.run(get_available_symbols(category="linear"))