109 lines
4.0 KiB
Python
109 lines
4.0 KiB
Python
import asyncio
|
|
import pickle
|
|
from urllib.parse import urlparse
|
|
|
|
try:
|
|
import aioredis
|
|
except ImportError:
|
|
aioredis = None
|
|
|
|
from .asyncio_pubsub_manager import AsyncPubSubManager
|
|
|
|
|
|
def _parse_redis_url(url):
|
|
p = urlparse(url)
|
|
if p.scheme not in {'redis', 'rediss'}:
|
|
raise ValueError('Invalid redis url')
|
|
ssl = p.scheme == 'rediss'
|
|
host = p.hostname or 'localhost'
|
|
port = p.port or 6379
|
|
password = p.password
|
|
if p.path:
|
|
db = int(p.path[1:])
|
|
else:
|
|
db = 0
|
|
return host, port, password, db, ssl
|
|
|
|
|
|
class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
|
|
"""Redis based client manager for asyncio servers.
|
|
|
|
This class implements a Redis backend for event sharing across multiple
|
|
processes. Only kept here as one more example of how to build a custom
|
|
backend, since the kombu backend is perfectly adequate to support a Redis
|
|
message queue.
|
|
|
|
To use a Redis backend, initialize the :class:`Server` instance as
|
|
follows::
|
|
|
|
server = socketio.Server(client_manager=socketio.AsyncRedisManager(
|
|
'redis://hostname:port/0'))
|
|
|
|
:param url: The connection URL for the Redis server. For a default Redis
|
|
store running on the same host, use ``redis://``. To use an
|
|
SSL connection, use ``rediss://``.
|
|
:param channel: The channel name on which the server sends and receives
|
|
notifications. Must be the same in all the servers.
|
|
:param write_only: If set to ``True``, only initialize to emit events. The
|
|
default of ``False`` initializes the class for emitting
|
|
and receiving.
|
|
"""
|
|
name = 'aioredis'
|
|
|
|
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
|
|
write_only=False, logger=None):
|
|
if aioredis is None:
|
|
raise RuntimeError('Redis package is not installed '
|
|
'(Run "pip install aioredis" in your '
|
|
'virtualenv).')
|
|
(
|
|
self.host, self.port, self.password, self.db, self.ssl
|
|
) = _parse_redis_url(url)
|
|
self.pub = None
|
|
self.sub = None
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger)
|
|
|
|
async def _publish(self, data):
|
|
retry = True
|
|
while True:
|
|
try:
|
|
if self.pub is None:
|
|
self.pub = await aioredis.create_redis(
|
|
(self.host, self.port), db=self.db,
|
|
password=self.password, ssl=self.ssl
|
|
)
|
|
return await self.pub.publish(self.channel,
|
|
pickle.dumps(data))
|
|
except (aioredis.RedisError, OSError):
|
|
if retry:
|
|
self._get_logger().error('Cannot publish to redis... '
|
|
'retrying')
|
|
self.pub = None
|
|
retry = False
|
|
else:
|
|
self._get_logger().error('Cannot publish to redis... '
|
|
'giving up')
|
|
break
|
|
|
|
async def _listen(self):
|
|
retry_sleep = 1
|
|
while True:
|
|
try:
|
|
if self.sub is None:
|
|
self.sub = await aioredis.create_redis(
|
|
(self.host, self.port), db=self.db,
|
|
password=self.password, ssl=self.ssl
|
|
)
|
|
self.ch = (await self.sub.subscribe(self.channel))[0]
|
|
retry_sleep = 1
|
|
return await self.ch.get()
|
|
except (aioredis.RedisError, OSError):
|
|
self._get_logger().error('Cannot receive from redis... '
|
|
'retrying in '
|
|
'{} secs'.format(retry_sleep))
|
|
self.sub = None
|
|
await asyncio.sleep(retry_sleep)
|
|
retry_sleep *= 2
|
|
if retry_sleep > 60:
|
|
retry_sleep = 60
|