Skip to content

Instantly share code, notes, and snippets.

@KShivendu
Created August 7, 2023 15:28
Show Gist options
  • Save KShivendu/feffa6be4806d76fd95c86734545aba4 to your computer and use it in GitHub Desktop.
Save KShivendu/feffa6be4806d76fd95c86734545aba4 to your computer and use it in GitHub Desktop.
Example application to demonstrate usage of supertokens python SDK with flask socketio
from flask import Flask, render_template, request
from flask_socketio import SocketIO, emit
import json
from backend import config
from supertokens_python import init
from flask import jsonify, g
from supertokens_python.framework.flask import Middleware
from supertokens_python.recipe.session.syncio import get_session, get_session_without_request_response
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret'
Middleware(app)
socketio = SocketIO(app, cors_allowed_origins="*")
init(
supertokens_config=config.supertokens_config,
app_info=config.app_info,
framework=config.framework,
recipe_list=config.recipe_list,
mode="wsgi"
)
# HTTP route for the main page
@app.route('/')
def index():
return render_template('index.html')
# HTTP route to handle a POST request
@app.route('/api/data', methods=['POST'])
def handle_post_request():
data = request.json # Assuming the request contains JSON data
# Process the data here
return {'message': 'Data received successfully'}, 200
# WebSocket event handler for a client connection
@socketio.on('connect')
def handle_connect():
print('Client connected.')
# WebSocket event handler for a client disconnection
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected.')
# WebSocket event handler for a custom event
@socketio.on('custom_event')
def handle_custom_event(data):
print(f'Received custom data: {data}')
# You can process the data here and send a response back to the client
emit('custom_response', {'message': 'Custom event received'}, broadcast=True)
@socketio.on('sessioninfo')
def handle_session_info(data):
# {
# "accessToken": "abc"
# }
print("websocket sessioninfo got", data)
body = json.loads(data)
access_token = body.get("accessToken")
if access_token is None:
emit('sessioninfo', {
"status": "Please provide access token",
})
return
session_ = get_session_without_request_response(access_token)
if session_ is None:
emit('sessioninfo', {
"status": "No sesion",
})
return
emit('sessioninfo', {
"sessionHandle": session_.get_handle(),
"userId": session_.get_user_id(),
"accessTokenPayload": session_.get_access_token_payload(),
})
@socketio.on('echo')
def handle_echo(data):
emit('echo', data) # broadcast=True
@app.route("/sessioninfo", methods=["GET"]) # type: ignore
def get_session_info():
# session_ = g.supertokens
session_ = get_session(request, session_required=False)
if session_ is None:
return jsonify(
{
"status": "No sesion",
}
)
return jsonify(
{
"sessionHandle": session_.get_handle(),
"userId": session_.get_user_id(),
"accessTokenPayload": session_.get_access_token_payload(),
}
)
if __name__ == '__main__':
socketio.run(app, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment