When working with web applications and external APIs, we’re very often using async code — it lets us send and receive multiple requests at the same time without blocking. Just as often, we’re sharing state between different parts of our app, which is where things like caching come in.
In this article, we’ll show how to create a Redis cache using async code, and how to set values with an expiry time in seconds. Since Redis stores everything in memory, we’ll also look at how to compress and decompress larger responses to save space. As a bonus, we’ll show how to run the deserialization step in an async-friendly way — keeping the whole flow non-blocking from start to finish.
This task is best explained with a mini-project. So here we will give a real example of a time where this method came in very handy.
To follow along with this post you should have redis, aiohttp, msgpack installed in your virtual environment.
Setting Up the Problem to Solve
To illustrate when and why we might want to use caching, we will show a real example for requesting exchange info from bybit, this is something we probably don't want to call very often, but additionally it has many different use cases, all with different tolerances for how fresh the data needs to be.
Notice in the script below, that this response is often paginated, meaning we have multiple requests to make each time we want the exchange information
import asyncio
from typing import List
import aiohttp
async def get_exchange_info(category: str = "linear") -> List[dict]:
url = "https://api.bybit.com/v5/market/instruments-info"
params = {"category": category, "limit": 1000}
tickers = []
async with aiohttp.ClientSession() as session:
while True:
async with session.get(url, params=params) as resp:
if resp.status == 200:
data = await resp.json()
result = data.get("result", {})
new_tickers = result.get("list", [])
tickers.extend(new_tickers)
# Handle pagination
next_cursor = result.get("nextPageCursor")
if next_cursor:
params["cursor"] = next_cursor
else:
break
else:
break
return tickers
if __name__ == "__main__":
tickers = asyncio.run(get_exchange_info())
print(tickers)
The print out below shows, a fraction of that actual data retrieved from this API call, at present there are in excess of 500 symbols each with a range of columns to parse.
So the problem here is that we don't want to be spending a lot of time making this API call unless we absolutely have to. Say for example we just want the symbols for Bybit USDT futures contracts, we know these change very infrequently i.e. new listings and delistings so we probably don't want to be calling this fresh each time. On the other hand, the funding interval part, we probably want to be calling that at least once at hour as this is dynamic information that is important to us.
Creating the Cache Object
First we will create a helper function that compresses data, this is very useful for working with Redis and my personal preference is msgpack along with zlib, this achieves high compression levels and doesn't add too much overhead, notice below we run the decompression in a thread so as not to block the event loop.
import asyncio
import json
import zlib
from typing import Any, Optional
import msgpack
import redis.asyncio as redis
async def deserialize_data_async(compressed_data) -> list:
if compressed_data is None:
return []
loop = asyncio.get_running_loop()
def _deserialize():
try:
serialized_data = zlib.decompress(compressed_data)
data = msgpack.unpackb(serialized_data, raw=False)
return data
except (
zlib.error,
msgpack.ExtraData,
msgpack.FormatError,
msgpack.StackError,
) as e:
print(f"Error decoding = {e}")
return []
return await loop.run_in_executor(None, _deserialize)
Below is the cache object we will use to set temporary keys in to redis, notice we use the setex() method to set a key with an expiry which is similar to the hget() method we covered previously.
class AsyncRedisCache:
def __init__(self, redis_url: str):
self.client = redis.from_url(redis_url)
async def get(self, key: str, compressed: bool = False) -> Optional[Any]:
data = await self.client.get(key)
if not data:
return None
if compressed:
return await deserialize_data_async(data)
try:
return json.loads(data)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
return None
async def set(self, key: str, values: Any, ttl: int, compress: bool = False):
"""
Set key with expiry in seconds
Args:
key (str): Redis key name
values (Any): Value to store (can be list, dict, etc.)
ttl (int): Time-to-live in seconds
compress (bool): Whether to compress using msgpack + zlib
"""
if compress:
packed = msgpack.packb(values, use_bin_type=True)
values = zlib.compress(packed)
else:
values = json.dumps(values)
await self.client.setex(key, ttl, values)
After that we can rewrite our API method to handle caching. We will add another method in order to get Bybit Symbols which we only want to update once a day.
def get_redis_url() -> str:
password = os.getenv("REDIS_PASSWORD", "")
return f"redis://:{password}@localhost:6379/0"
def get_redis_cache() -> AsyncRedisCache:
return AsyncRedisCache(get_redis_url())
async def get_exchange_info(
category: str = "linear", ttl: int = 60, compress: bool = True
) -> List[dict]:
cache = get_redis_cache()
res = await cache.get(key=f"bybitInfo:{category}", compressed=compress)
if res:
print("Returning values from cache")
return res
url = "https://api.bybit.com/v5/market/instruments-info"
params = {"category": category, "limit": 1000}
tickers = []
async with aiohttp.ClientSession() as session:
while True:
async with session.get(url, params=params) as resp:
if resp.status == 200:
data = await resp.json()
result = data.get("result", {})
tickers.extend(result.get("list", []))
if result.get("nextPageCursor"):
params["cursor"] = result["nextPageCursor"]
else:
break
else:
break
await cache.set(
key=f"bybitInfo:{category}", values=tickers, ttl=ttl, compress=compress
)
return tickers
async def get_available_symbols(category: str, ttl: int = 60 * 60 * 24) -> list:
cache = get_redis_cache()
res = await cache.get(key=f"bybitSymbols:{category}")
if res:
print("Returning symbol values from cache")
return res
info = await get_exchange_info(category=category)
symbols = [x["symbol"] for x in info]
if symbols:
await cache.set(
key=f"bybitSymbols:{category}", values=symbols, ttl=ttl, compress=False
)
return symbols
Ok great, if we run this , the first call for get Bybit linear symbols will make a call to the API, the next time, it will return from the cache.
if __name__ == "__main__":
s = asyncio.run(get_available_symbols(category="linear"))
If we run this twice we will see the output below after the first call.
Returning symbol values from cache
Full script
import asyncio
import json
import os
import zlib
from typing import Any, List, Optional
import aiohttp
import msgpack
import redis.asyncio as redis
async def deserialize_data_async(compressed_data) -> list:
if compressed_data is None:
return []
loop = asyncio.get_running_loop()
def _deserialize():
try:
serialized_data = zlib.decompress(compressed_data)
data = msgpack.unpackb(serialized_data, raw=False)
return data
except (
zlib.error,
msgpack.ExtraData,
msgpack.FormatError,
msgpack.StackError,
) as e:
print(f"Error decoding = {e}")
return []
return await loop.run_in_executor(None, _deserialize)
class AsyncRedisCache:
def __init__(self, redis_url: str):
self.client = redis.from_url(redis_url)
async def get(self, key: str, compressed: bool = False) -> Optional[Any]:
data = await self.client.get(key)
if not data:
return None
if compressed:
return await deserialize_data_async(data)
try:
return json.loads(data)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
return None
async def set(self, key: str, values: Any, ttl: int, compress: bool = False):
"""
Set key with expiry in seconds
Args:
key (str): Redis key name
values (Any): Value to store (can be list, dict, etc.)
ttl (int): Time-to-live in seconds
compress (bool): Whether to compress using msgpack + zlib
"""
if compress:
packed = msgpack.packb(values, use_bin_type=True)
values = zlib.compress(packed)
else:
values = json.dumps(values)
await self.client.setex(key, ttl, values)
def get_redis_url() -> str:
password = os.getenv("REDIS_PASSWORD", "")
return f"redis://:{password}@localhost:6379/0"
def get_redis_cache() -> AsyncRedisCache:
return AsyncRedisCache(get_redis_url())
async def get_exchange_info(
category: str = "linear", ttl: int = 60, compress: bool = True
) -> List[dict]:
cache = get_redis_cache()
res = await cache.get(key=f"bybitInfo:{category}", compressed=compress)
if res:
print("Returning values from cache")
return res
url = "https://api.bybit.com/v5/market/instruments-info"
params = {"category": category, "limit": 1000}
tickers = []
async with aiohttp.ClientSession() as session:
while True:
async with session.get(url, params=params) as resp:
if resp.status == 200:
data = await resp.json()
result = data.get("result", {})
tickers.extend(result.get("list", []))
if result.get("nextPageCursor"):
params["cursor"] = result["nextPageCursor"]
else:
break
else:
break
await cache.set(
key=f"bybitInfo:{category}", values=tickers, ttl=ttl, compress=compress
)
return tickers
async def get_available_symbols(category: str, ttl: int = 60 * 60 * 24) -> list:
cache = get_redis_cache()
res = await cache.get(key=f"bybitSymbols:{category}")
if res:
print("Returning symbol values from cache")
return res
info = await get_exchange_info(category=category)
symbols = [x["symbol"] for x in info]
if symbols:
await cache.set(
key=f"bybitSymbols:{category}", values=symbols, ttl=ttl, compress=False
)
return symbols
if __name__ == "__main__":
s = asyncio.run(get_available_symbols(category="linear"))