How to write a simple chat in aiohttp websocket (Example)

05 Apr 2016
How to write a simple chat in aiohttp websocket (Example)

Now there is a huge number of different frameworks, each of them with its own philosophy, syntax and realization of patterns common for web. We hope that eventually this diversity will be on a single basis - aiohttp.

Structure

To test all aiohttp capabilities to a maximum, we tried to develop a simple chat on websockets. An infinite loop, in which handlers are turning around, is a basis for aiohttp. Handler is a so-called coroutine - an object, which does not block input/output (I/O). Given type of objects appeared in python 3.4 in asyncio library. Until all the calculations are fulfilled in this object, it kind of ‘falls asleep’, while the interpreter can process other objects. To illustrate this, we will give an example. Quite often, all server delays happen when it is waiting for database response and until this response is given and processed, the rest id the objects are waiting for their turn. In this case, the other objects will be processed before the database response is given. But you need an asynchronous driver for a realization of this. For now asynchronous drivers and wrappers are realized for aiohttp for most of popular databases (postgresql, mysql, redis). There is Motor for mongodb, which is used in our chat.

App.py file serves as an entry point. Object app is created in it.

import asyncio

from aiohttp import web

loop = asyncio.get_event_loop()

app = web.Application(loop=loop, middlewares=[

session_middleware(EncryptedCookieStorage(SECRET_KEY)),

authorize,

db_handler,

])

As you can see, ‘app’ is shared to loop in initialization, as well as ‘middleware’ list, about which we will tell later.

Routes

Unlike flask, which aiohttp is very similar to, routes are added to already initialized app application.

app.router.add_route('GET', '/{name}', handler)

Here is Svetlov’s explanation for this way of realization.

Routes filling is extracted to a separate routes.py file.

from chat.views import ChatList, WebSocket

from auth.views import Login, SignIn, SignOut

routes = [

('GET', '/', ChatList, 'main'),

('GET', '/ws', WebSocket, 'chat'),

('*', '/login', Login, 'login'),

('*', '/signin', SignIn, 'signin'),

('*', '/signout', SignOut, 'signout'),

]

The first element is http method, url goes further, handler is the third in the chain and route name is the last one, to be easily called in a code.

Then routes list is imported to app.py and they are filled with a simple loop to an application.

from routes import routes

for route in routes:

app.router.add_route(route[0], route[1], route[2], name=route[3])

Everything is simple and logical.

Handlers, Request and Response

We decided to follow the Django framework example for request processing. Everything, that deals with users, authorization, user-creation processing, and their login, is stored in auth folder. Chat folder contains chat work logic correrspondently. In aiohttp you can realize handler as a function, as well as a class. We chose a class realization.

class Login(web.View):

async def get(self):

session = await get_session(self.request)

if session.get('user'):

url = request.app.router['main'].url()

raise web.HTTPFound(url)

return b'Please enter login or email'

Sessions will be described below, and the rest is clear. It should be pointed out, that forwarding is carried out through either return or exception emission in the form web.HTTPFound() object, which is give the path via parameter. Http methods are realized in a class via asynchronous functions get, post and so on. There are certain specific features if you need to work with query parameters.

data = await self.request.post()

Configuration settings

All the settings are stored in settings.py file. To store secret data we use envparse. This utility allows to read data from environment variables, and to parse a particular file with this variables.

if isfile('.env'):

env.read_envfile('.env')

Firstly, it was necessary for project processing in Heroku, and secondly, it turned out to be very convenient. At first, we used a local basis, and then tested using a remote one, and switching consisted in changing only one line in .env. file.

Middlewares

When initializing an application you can set middlewares. Here they are extracted to a separate file. The realization is standard - a decorator function, which can make verifications or any other actions with a request.

Autorization verification sample

async def authorize(app, handler):

async def middleware(request):

def check_path(path):

result = True

for r in ['/login', '/static/', '/signin', '/signout', '/_debugtoolbar/']:

if path.startswith(r):

result = False

return result

session = await get_session(request)

if session.get("user"):

return await handler(request)

elif check_path(request.path):

url = request.app.router['login'].url()

raise web.HTTPFound(url)

return handler(request)

else:

return await handler(request)

return middleware

We also created a middleware for database connection.

async def db_handler(app, handler):

async def middleware(request):

if request.path.startswith('/static/') or request.path.startswith('/_debugtoolbar'):

response = await handler(request)

return response

request.db = app.db

response = await handler(request)

return response

return middleware

Connection details below.

Databases

For the chat we used Mongodb and asynchronous driver Motor. Connection to the base is carried out via application initialization.

app.client = ma.AsyncIOMotorClient(MONGO_HOST)

app.db = app.client[MONGO_DB_NAME]

And connection closing is carried out in a special shutdown function.

async def shutdown(server, app, handler):

server.close()

await server.wait_closed()

app.client.close() # database connection close

await app.shutdown()

await handler.finish_connections(10.0)

await app.cleanup(

It should be mentioned, that it is necessary to finish all the tasks correctly in case of an asynchronous server. Now some more details about loop event creation.

loop = asyncio.get_event_loop()

serv_generator, handler, app = loop.run_until_complete(init(loop))

serv = loop.run_until_complete(serv_generator)

log.debug('start server', serv.sockets[0].getsockname())

try:

loop.run_forever()

except KeyboardInterrupt:

log.debug(' Stop server begin')

finally:

loop.run_until_complete(shutdown(serv, app, handler))

loop.close()

log.debug('Stop server end')

The loop itself is made from asyncio.

serv_generator, handler, app = loop.run_until_complete(init(loop))

Run_until_complete method adds coriutines to loop. In this case it adds application initialization.

try:

loop.run_forever()

except KeyboardInterrupt:

log.debug(' Stop server begin')

finally:

loop.run_until_complete(shutdown(serv, app, handler))

loop.close()

The realization of an infinite cycle, which is interrupted in case of exception. Before closing shutdown function should be called to finish all the connections and to stop the server correctly.

Now we should understand how to make queries, extract and change data.

class Message():

def __init__(self, db, **kwargs):

self.collection = db[MESSAGE_COLLECTION]

async def save(self, user, msg, **kw):

result = await self.collection.insert({'user': user, 'msg': msg, 'time': datetime.now()})

return result

async def get_messages(self):

messages = self.collection.find().sort([('time', 1)])

return await messages.to_list(length=None)

Although ORM was not involved in our work, it is better to make database queries in separate classes. Models.py file, containing Message class, was created in chat folder. In get_messages method we create a query, which extracts all the stored messages, sorted by time. In save method we create a query for saving messages in a database.

Templates

There are several asynchronous wrappers created for aiohttp for popular template processors, namely aiohttp_jinja2 and aiohttp_mako. For chat we use jinja2.

aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))

This is how templates support is initialized in application. FileSystemLoader(‘templates’) shows jinja2 that our templates are stored in templates folder.

class ChatList(web.View):

@aiohttp_jinja2.template('chat/index.html')

async def get(self):

message = Message(self.request.db)

messages = await message.get_messages()

return {'messages': messages}

We specify via decorator which template we are going to use in views; and for context filling we return the dictionary with variables for further work in a template.

Sessions, autorization

To work with sessions there is aiohttp_session library. You can store sessions in Redis or in cookies in an encrypted form using cryptography. A storage method is specified at the stage of library installation.

aiohttp_session[secure]

To initialize a session we add it to middleware.

session_middleware(EncryptedCookieStorage(SECRET_KEY)),

To extract or put values to the session, you first need to extract it from the request.

session = await get_session(request)

For user automatization, we add his id to the session, and then check its existence in middleware. For security, you need more verifications, of course, but to test a concept this is enough.

Static

A folder with static content is connected by a separate route when application is initialized.

app.router.add_static('/static', 'static', name='static')

To use it in a template, we should extract it from the app.

<script src="{{ app.router.static.url(filename='js/main.js') }}" loading="lazy" loading="lazy"></script>

Just as easy as that.

WebSocket

Finally, we have reached the most interesting part of aiohttp. Socket realization is very simple. In JavaScript, we added functionality essential for its work.

try{

var sock = new WebSocket('ws://' + window.location.host + '/ws');

}

catch(err){

var sock = new WebSocket('wss://' + window.location.host + '/ws');

}

// show message in div#subscribe

function showMessage(message) {

var messageElem = $('#subscribe'),

height = 0,

date = new Date();

options = {hour12: false};

messageElem.append($('<p>').html('[' + date.toLocaleTimeString('en-US', options) + '] ' + message + '\n'));

messageElem.find('p').each(function(i, value){

height += parseInt($(this).height());

});

messageElem.animate({scrollTop: height});

}

function sendMessage(){

var msg = $('#message');

sock.send(msg.val());

msg.val('').focus();

}

sock.onopen = function(){

showMessage('Connection to server started')

}

// send message from form

$('#submit').click(function() {

sendMessage();

});

$('#message').keyup(function(e){

if(e.keyCode == 13){

sendMessage();

}

});

// income message handler

sock.onmessage = function(event) {

showMessage(event.data);

};

$('#signout').click(function(){

window.location.href = "signout"

});

sock.onclose = function(event){

if(event.wasClean){

showMessage('Clean connection end')

}else{

showMessage('Connection broken')

}

};

sock.onerror = function(error){

showMessage(error);

}

For server part realization we use WebSocket class.

lass WebSocket(web.View):

async def get(self):

ws = web.WebSocketResponse()

await ws.prepare(self.request)

session = await get_session(self.request)

user = User(self.request.db, {'id': session.get('user')})

login = await user.get_login()

for _ws in self.request.app['websockets']:

_ws.send_str('%s joined' % login)

self.request.app['websockets'].append(ws)

async for msg in ws:

if msg.tp == MsgType.text:

if msg.data == 'close':

await ws.close()

else:

message = Message(self.request.db)

result = await message.save(user=login, msg=msg.data)

log.debug(result)

for _ws in self.request.app['websockets']:

_ws.send_str('(%s) %s' % (login, msg.data))

elif msg.tp == MsgType.error:

log.debug('ws connection closed with exception %s' % ws.exception())

self.request.app['websockets'].remove(ws)

for _ws in self.request.app['websockets']:

_ws.send_str('%s disconected' % login)

log.debug('websocket connection closed')

return ws

The socket itself is created with the WebSocketResponse() function. You must necessarily “prepare” it before using it. The list of open sockets is stored in application (for correct finishing when closing a server).

When a new user is logged in, all participants of the chat get a notification that a new participant has joined in. Then we wait for user's message. If it is valid, we save it to a database and send to other chat participants. When socket is closed we delete it from the list and notify the chat that one of the participants has left it. The realization is quite simple, visually in a synchronous style, without many callbacks, as in Tornado, for example. Take it and use).

Heroku unloading

We loaded a test chat on Heroku for demonstartion. Several problems appeared at installation, namely to use their internal base mongodb credit card data was required, which I was very reluctant to do, that is why we used MongoLab service and created a base there. Theb there were some problems with application installation. To install cryptography we had to specify it in requirements.txt. Also, to specify python version you need to create a runtime.txt. file in the project root.

Conclusions

In general, chat development, aiohttp studying, understanding sockets and some other technologies, which we have not worked with before, took me approximately 3 weeks of evening woork and rarely at weekends. Aiohttp documentation is quite good, many asynchronous drivers and wrappers are ready for testing. Probably not everything is ready for production yet, but it is being actively developed (in 3 weeks aiohttp updated from version 0.19 to 0.21). If you need to add sockets to a project, this variant will perfectly serve the purpose, in order not to add a heavy Tornado in project requirements.

Useful links

  1. Aiohttp official documentation
  2. Asynchronous drivers and aiohttp libraries available on Github
  3. Mongodb official documentation
  4. Motor official documentation (asynchronous driver for Mongodb)
  5. Envparse - a library source code for environmet variables management