ソース|Python上でのSocket.IOのテスト
Python 3.10.8で実行できることを確認している
code:requirements.txt
aiohttp==3.8.1
python-socketio==5.7.1
code:client.py
import asyncio
import socketio
sio = socketio.AsyncClient()
@sio.event
async def connect():
print('connection established')
@sio.event
async def my_message(data):
print('message received with ', data)
await sio.emit('my response', {'response': 'my response'})
@sio.event
async def disconnect():
print('disconnected from server')
async def main():
await sio.wait()
if __name__ == '__main__':
asyncio.run(main())
code:server.py
from aiohttp import web
import socketio
sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)
async def index(request):
"""Serve the client-side application."""
with open('index.html') as f:
return web.Response(text=f.read(), content_type='text/html')
@sio.event
def connect(sid, environ, auth):
print("connect ", sid)
# print(f"environ: {environ}")
print(f"auth: {auth}")
@sio.event
async def message(sid, data):
print("message ", data)
@sio.event
def disconnect(sid):
print('disconnect ', sid)
app.router.add_static('/static', 'static')
app.router.add_get('/', index)
if __name__ == '__main__':
web.run_app(app, port=5000)