JWT Authentication in Python and Mongo DB
In the previous tutorial, we created a URL shortener app where user can make their URL short and easily shareable. In this tutorial, we are going to add authentication using JWT (Json Web Token) in Python.
Create a Vuex store
First, we need to create a Vuex store. So run the following command in your “web” folder to install Vuex module:
> npm install vuex
After that, go to your “web/src” folder and create a file named “store.js”. Write the following code in that file.
import { createStore } from "vuex"
export default createStore({
state() {
return {
user: null
}
},
mutations: {
set_user(state, user) {
state.user = user
}
},
getters: {
get_user(state) {
return state.user
}
}
})
This will be used to get and set the global variable “user” throughout the application.
Fetch authenticated user
Then goto your AppHeader component and import the following modules:
import axios from "axios"
import swal from "sweetalert2"
import store from "../../store"
And change your export default object to the following:
export default {
name: "AppHeader",
computed: {
user() {
return store.getters.get_user
}
},
methods: {
async get_data() {
const formData = new FormData()
formData.append("access_token", localStorage.getItem(this.$access_token_key))
formData.append("timezone", Intl.DateTimeFormat().resolvedOptions().timeZone)
try {
const response = await axios.post(
this.$api_url + "/me",
formData
)
if (response.data.status == "success") {
store.commit("set_user", response.data.user)
}
} catch (exp) {
console.log(exp)
}
}
},
mounted() {
this.get_data()
}
}
This will call can AJAX to the Python API to fetch the authenticated user and save them in Vuex store. Now we need to create an API in Python that will handle this request.
Open terminal in your “api” folder and run the following command to install JWT module. It will be used for generating Json Web Tokens.
> pip3 install pyjwt
Open your “api/api.py” file and import this module.
import jwt, time
We will also be needing time module to convert the current time into Unix timestamp. That will be used to expire the JWT after 24 hours.
Then create a variable for JWT secret. It can be any random string.
jwt_secret = "qwertyuiopasdfgfhjklzxcvbnm"
Then create the following API.
@app.post("/me")
def get_user(timezone: Annotated[str, Form()], access_token: Annotated[str, Form()]):
user = None
try:
payload = jwt.decode(access_token, jwt_secret, algorithms = "HS256")
dt1 = time.mktime(datetime.now().timetuple())
dt2 = payload["exp"]
delta = dt2 - dt1
delta = delta / 60 / 60
if delta <= 0:
return {
"status": "error",
"message": "You have been logged-out."
}
user = db["users"].find_one({
"$and": [{
"_id": ObjectId(payload["user_id"])
}, {
"access_token": access_token
}]
})
if user == None:
return {
"status": "error",
"message": "You have been logged-out."
}
return {
"status": "success",
"message": "User has been fetched.",
"user": {
"_id": str(user["_id"]),
"name": user["name"],
"email": user["email"]
}
}
except Exception as error:
return {
"status": "error",
"message": "You have been logged-out.",
"error": str(error)
}
else:
pass
finally:
pass
The API will first decode the JWT. It will return payload containing “user_id” and “exp” in Unix timestamp. “exp” is expiry date of JWT.
Then it takes the difference between current time and the expiry time. Difference “delta” will be in seconds. So we will convert it into hours and check if it is greater than 24.
If JWT is not expired yet, we will check if the user exists using his ID and access token. And return the response to the client.
If you test the app now, and open your browser’s “Network” tab. You will see that right now you are getting “status” as “error”. This is because no user is signed-in yet.
Login/signup
In your AppHeader component, create the following links in navbar.
<template v-if="user == null">
<li class="nav-item">
<router-link class="nav-link" to="/login">Login</router-link>
</li>
<li class="nav-item">
<router-link class="nav-link" to="/signup">Sign up</router-link>
</li>
</template>
<li class="nav-item" v-else>
<a href="javascript:void(0)" v-on:click="logout" class="nav-link">Logout</a>
</li>
Right now, it will show you login and sign up links. We will first do the sign up feature.
Sign up
Create a file named “SignupComponent.vue” in your “web/src/components” folder. Open “web/src/main.js” and import the SignupComponent:
import SignupComponent from "./components/SignupComponent.vue"
And add it in “routes” array.
{ path: "/signup", component: SignupComponent },
Following will be the content of SignupComponent.
<template>
<div class="container" style="margin-top: 50px;">
<div class="row">
<div class="offset-md-3 col-md-6">
<h2 style="margin-bottom: 30px; text-align: center;">Sign up</h2>
<form method="POST" v-on:submit.prevent="signup">
<div class="form-group">
<label>Enter name</label>
<input type="text" class="form-control" name="name" required />
</div>
<br />
<div class="form-group">
<label>Enter email</label>
<input type="email" class="form-control" name="email" required />
</div>
<br />
<div class="form-group">
<label>Enter password</label>
<input type="password" class="form-control" name="password" required />
</div>
<br />
<input type="submit" v-bind:value="isLoading ? 'Loading...' : 'Register'" v-bind:disabled="isLoading" name="submit" class="btn btn-primary" />
</form>
</div>
</div>
</div>
</template>
<script>
import axios from "axios"
import swal from "sweetalert2"
export default {
name: "SignupComponent",
data() {
return {
isLoading: false
}
},
methods: {
signup: async function () {
const form = event.target;
const formData = new FormData(form);
this.isLoading = true;
try {
const response = await axios.post(
this.$api_url + "/signup",
formData
);
swal.fire("Signup", response.data.message, response.data.status)
if (response.data.status == "success") {
form.reset();
}
} catch (exp) {
console.log(exp)
} finally {
this.isLoading = false
}
}
}
}
</script>
Then in your “api/api.py”, we need to create an API in Python that will handle this request. We need to encrypt the passwords before storing them in database. So install the bcrypt module in Python.
> pip3 install bcrypt
Then import it in “api/api.py”.
import bcrypt
And then create an API that will save the user’s data in database.
@app.post("/signup")
def signup(name: Annotated[str, Form()], email: Annotated[str, Form()], password: Annotated[str, Form()]):
user = db["users"].find_one({
"email": email
})
if user != None:
return {
"status": "error",
"message": "Email already exists."
}
db["users"].insert_one({
"name": name,
"email": email,
"password": bcrypt.hashpw(password.encode("UTF-8"), bcrypt.gensalt()),
"created_at": datetime.now(timezone_module.utc)
})
return {
"status": "success",
"message": "Account has been created. Please login now."
}
Login
Create a file named “LoginComponent.vue” in your “web/src/components” folder. Open “web/src/main.js” and import the LoginComponent:
import LoginComponent from "./components/LoginComponent.vue"
And add it in “routes” array.
{ path: "/login", component: LoginComponent },
Following will be the content of LoginComponent:
<template>
<div class="container" style="margin-top: 50px;">
<div class="row">
<div class="offset-md-3 col-md-6">
<h2 style="margin-bottom: 30px; text-align: center;">Login</h2>
<form method="POST" v-on:submit.prevent="login">
<div class="form-group">
<label>Enter email</label>
<input type="email" class="form-control" name="email" required />
</div>
<br />
<div class="form-group">
<label>Enter password</label>
<input type="password" class="form-control" name="password" required />
</div>
<br />
<input type="submit" v-bind:value="isLoading ? 'Loading...' : 'Login'" v-bind:disabled="isLoading" name="submit" class="btn btn-primary" />
</form>
</div>
</div>
</div>
</template>
<script>
import axios from "axios"
import swal from "sweetalert2"
import store from "../store"
export default {
name: "LoginComponent",
methods: {
login: async function () {
const form = event.target;
const formData = new FormData(form);
this.isLoading = true;
try {
const response = await axios.post(
this.$api_url + "/login",
formData
)
if (response.data.status == "success") {
// get access token from server
const access_token = response.data.access_token
// save in local storage
localStorage.setItem(this.$access_token_key, access_token)
store.commit("set_user", response.data.user)
form.reset()
// to go to home page without refreshing
this.$router.push("/")
} else {
swal.fire("Error", response.data.message, "error")
}
} catch (exp) {
console.log(exp)
} finally {
this.isLoading = false
}
}
}
}
</script>
Once user is logged-in, we will save the access token in local storage and redirect the user to home page. In your main.js file, create a variable that will hold the key for local storage value.
app.config.globalProperties.$access_token_key = "access_token"
Now in your “api/api.py” import the timedelta module. We will be using this module to add 24 hours in current date for setting expiry date of JWT.
from datetime import timedelta
Create the following API in Python that will do the authentication using JWT.
@app.post("/login")
def login(email: Annotated[str, Form()], password: Annotated[str, Form()]):
user = db["users"].find_one({
"email": email
})
if user == None:
return {
"status": "error",
"message": "Email does not exists."
}
if bcrypt.checkpw(password.encode("UTF-8"), user["password"]) != True:
return {
"status": "error",
"message": "Password is in-correct."
}
access_token = jwt.encode({
"user_id": str(user["_id"]),
"time": datetime.now(timezone_module.utc).timetuple(),
"exp": datetime.now(timezone_module.utc) + timedelta(hours=24)
}, jwt_secret, algorithm = "HS256")
db["users"].find_one_and_update({
"_id": user["_id"]
}, {
"$set": {
"access_token": access_token
}
})
return {
"status": "success",
"message": "Login successfully.",
"access_token": access_token,
"user": {
"_id": str(user["_id"]),
"name": user["name"],
"email": user["email"]
}
}
It will first check if the email exists. Then it will check the password against the hashed password saved in database. Then it will generate an access token using algorithm “HS256”.
Test the app now. You will be able to login now. And once logged-in, you will no longer see login/signup links in the navbar. Instead, you will see a button for logout.
Logout
Logout button in navbar already have an onclick listener attached to it. We need to create its method in AppHeader component.
async logout() {
const formData = new FormData()
formData.append("access_token", localStorage.getItem(this.$access_token_key))
try {
const response = await axios.post(
this.$api_url + "/logout",
formData
)
if (response.data.status == "success") {
store.commit("set_user", null)
localStorage.removeItem(this.$access_token_key)
this.$router.push("/login")
}
} catch (exp) {
console.log(exp)
}
},
This will call an AJAX to the Python API. Once user is logged-out, it will:
- Set the “user” store value to null.
- Remove access token from local storage.
- Redirect the user to login page.
Last step we need, is to create an API in Python that will handle this AJAX request.
# api/api.py
@app.post("/logout")
def logout(access_token: Annotated[str, Form()]):
try:
payload = jwt.decode(access_token, jwt_secret, algorithms = "HS256")
db["users"].find_one_and_update({
"_id": ObjectId(payload["user_id"])
}, {
"$unset": {
"access_token": 1
}
})
return {
"status": "success",
"message": "User has been logged-out."
}
except Exception as error:
return {
"status": "error",
"message": "You have been logged-out.",
"error": str(error)
}
It simply removes the “access_token” key from that user’s Mongo DB document. Try to logout now, you will be redirected to login page and you will again see the login/signup links in the navbar.
So that’s how you can do the user authentication in Python using JWT.