UD - FLASK - vanilla SQL
Last updated
Was this helpful?
Last updated
Was this helpful?
INTEGER PRIMARY KEY
for automatically increment id
.
To create a table, you need to run $ python create_table.py
, before running app.
# create_table.py
import sqlite3
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
create_table = "CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username text, password text)"
cursor.execute(create_table)
create_table = "CREATE TABLE IF NOT EXISTS items (name text PRIMARY KEY, price real)"
cursor.execute(create_table)
connection.commit()
connection.close()
UserRegister
class POST
method to handle signup
# user.py
class User():
TABLE_NAME = 'users'
def __init__(self, _id, username, password):
self.id = _id
self.username = username
self.password = password
...
class UserRegister(Resource):
TABLE_NAME = 'users'
parser = reqparse.RequestParser()
parser.add_argument('username',
type=str,
required=True,
help="This field cannot be left blank!"
)
parser.add_argument('password',
type=str,
required=True,
help="This field cannot be left blank!"
)
def post(self):
data = UserRegister.parser.parse_args()
if User.find_by_username(data['username']): # avoid duplicate signup.
return {"message": "User with that username already exists."}, 400
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "INSERT INTO {table} VALUES (NULL, ?, ?)".format(table=self.TABLE_NAME)
cursor.execute(query, (data['username'], data['password']))
connection.commit()
connection.close()
return {"message": "User created successfully."}, 201
create endpoint
# app.py
api.add_resource(UserRegister, '/register')
For GET
method, we don't need connection.commit()
.
# user.py
import sqlite3
from flask_restful import Resource, reqparse
class User():
TABLE_NAME = 'users'
def __init__(self, _id, username, password):
self.id = _id
self.username = username
self.password = password
@classmethod
def find_by_username(cls, username):
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "SELECT * FROM {table} WHERE username=?".format(table=cls.TABLE_NAME)
result = cursor.execute(query, (username,))
row = result.fetchone()
if row:
user = cls(*row)
else:
user = None
connection.close()
return user
@classmethod
def find_by_id(cls, _id):
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "SELECT * FROM {table} WHERE id=?".format(table=cls.TABLE_NAME)
result = cursor.execute(query, (_id,))
row = result.fetchone()
if row:
user = cls(*row)
else:
user = None
connection.close()
return user
class UserRegister(Resource):
...
from flask_restful import Resource, reqparse
from flask_jwt import jwt_required
import sqlite3
class Item(Resource):
TABLE_NAME = 'items'
parser = reqparse.RequestParser()
parser.add_argument('price',
type=float,
required=True,
help="This field cannot be left blank!"
)
@classmethod
def find_by_name(cls, name):
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "SELECT * FROM {table} WHERE name=?".format(table=cls.TABLE_NAME)
result = cursor.execute(query, (name,))
row = result.fetchone()
connection.close() # no connection.commit() needed.
if row:
return {'item': {'name': row[0], 'price': row[1]}}
@classmethod
def insert(cls, item):
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "INSERT INTO {table} VALUES(?, ?)".format(table=cls.TABLE_NAME)
cursor.execute(query, (item['name'], item['price']))
connection.commit()
connection.close()
@classmethod
def update(cls, item):
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "UPDATE {table} SET price=? WHERE name=?".format(table=cls.TABLE_NAME)
cursor.execute(query, (item['price'], item['name']))
connection.commit()
connection.close()
@jwt_required()
def get(self, name):
item = self.find_by_name(name)
if item:
return item
return {'message': 'Item not found'}, 404
def post(self, name):
if self.find_by_name(name):
return {'message': "An item with name '{}' already exists.".format(name)}
data = Item.parser.parse_args()
item = {'name': name, 'price': data['price']}
try:
Item.insert(item)
except:
return {"message": "An error occurred inserting the item."}
return item
@jwt_required()
def delete(self, name):
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "DELETE FROM {table} WHERE name=?".format(table=self.TABLE_NAME)
cursor.execute(query, (name,))
connection.commit()
connection.close()
return {'message': 'Item deleted'}
@jwt_required()
def put(self, name):
data = Item.parser.parse_args()
item = self.find_by_name(name)
updated_item = {'name': name, 'price': data['price']}
if item is None:
try:
Item.insert(updated_item)
except:
return {"message": "An error occurred inserting the item."}
else:
try:
Item.update(updated_item)
except:
raise
return {"message": "An error occurred updating the item."}
return updated_item
class ItemList(Resource):
TABLE_NAME = 'items'
def get(self):
connection = sqlite3.connect('data.db')
cursor = connection.cursor()
query = "SELECT * FROM {table}".format(table=self.TABLE_NAME)
result = cursor.execute(query)
items = []
for row in result:
items.append({'name': row[0], 'price': row[1]})
connection.close()
return {'items': items}