TiDB と Python を使用して単純な CRUD アプリを構築する
このドキュメントでは、TiDB と Python を使用して単純な CRUD アプリケーションを構築する方法について説明します。
ノート:
Python 3.10 以降のバージョンの Python を使用することをお勧めします。
ステップ 1. TiDB クラスターを起動する
以下にTiDBクラスターの起動方法を紹介します。
TiDB Cloud Serverless Tierクラスターを使用する
詳細な手順については、 Serverless Tierクラスターを作成するを参照してください。
ローカル クラスターを使用する
詳細な手順については、 ローカル テスト クラスターをデプロイまたはTiUPを使用して TiDB クラスターをデプロイを参照してください。
ステップ 2. コードを取得する
git clone https://github.com/pingcap-inc/tidb-example-python.git
- SQLAlchemy (Recommended)
- peewee (Recommended)
- mysqlclient
- PyMySQL
- mysql-connector-python
SQL錬金術は、人気のある Python 用のオープンソース ORM ライブラリです。以下では、例として SQLAlchemy 1.44 を使用します。
import uuid
from typing import List
from sqlalchemy import create_engine, String, Column, Integer, select, func
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_engine('mysql://root:@127.0.0.1:4000/test')
Base = declarative_base()
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
class Player(Base):
__tablename__ = "player"
id = Column(String(36), primary_key=True)
coins = Column(Integer)
goods = Column(Integer)
def __repr__(self):
return f'Player(id={self.id!r}, coins={self.coins!r}, goods={self.goods!r})'
def random_player(amount: int) -> List[Player]:
players = []
for _ in range(amount):
players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
return players
def simple_example() -> None:
with Session() as session:
# create a player, who has a coin and a goods.
session.add(Player(id="test", coins=1, goods=1))
# get this player, and print it.
get_test_stmt = select(Player).where(Player.id == "test")
for player in session.scalars(get_test_stmt):
print(player)
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
session.bulk_save_objects(player_list[idx:idx + 114])
# print the number of players
count = session.query(func.count(Player.id)).scalar()
print(f'number of players: {count}')
# print 3 players.
three_players = session.query(Player).limit(3).all()
for player in three_players:
print(player)
session.commit()
def trade_check(session: Session, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
# sell player goods check
sell_player = session.query(Player.goods).filter(Player.id == sell_id).with_for_update().one()
if sell_player.goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
buy_player = session.query(Player.coins).filter(Player.id == buy_id).with_for_update().one()
if buy_player.coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
with Session() as session:
if trade_check(session, sell_id, buy_id, amount, price) is False:
return
# deduct the goods of seller, and raise his/her the coins
session.query(Player).filter(Player.id == sell_id). \
update({'goods': Player.goods - amount, 'coins': Player.coins + price})
# deduct the coins of buyer, and raise his/her the goods
session.query(Player).filter(Player.id == buy_id). \
update({'goods': Player.goods + amount, 'coins': Player.coins - price})
session.commit()
print("trade success")
def trade_example() -> None:
with Session() as session:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
session.add(Player(id="1", coins=100, goods=0))
session.add(Player(id="2", coins=114514, goods=20))
session.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(sell_id="2", buy_id="1", amount=2, price=100)
with Session() as session:
traders = session.query(Player).filter(Player.id.in_(("1", "2"))).all()
for player in traders:
print(player)
session.commit()
simple_example()
trade_example()
ドライバーを直接使用する場合と比較して、SQLAlchemy は、データベース接続を作成するときに、さまざまなデータベースの特定の詳細を抽象化します。さらに、SQLAlchemy は、セッション管理や基本オブジェクトの CRUD などの一部の操作をカプセル化するため、コードが大幅に簡素化されます。
Player
のクラスは、アプリケーション内の属性へのテーブルのマッピングです。 Player
の各属性は、 player
テーブルのフィールドに対応します。 SQLAlchemy に詳細情報を提供するために、属性はid = Column(String(36), primary_key=True)
として定義され、フィールド タイプとその追加属性を示します。たとえば、 id = Column(String(36), primary_key=True)
id
属性がString
型であること、データベース内の対応するフィールドがVARCHAR
型であること、長さが36
であること、および主キーであることを示します。
SQLAlchemy の使用方法の詳細については、 SQLAlchemy ドキュメントを参照してください。
ピーウィーは、人気のある Python 用のオープンソース ORM ライブラリです。以下では例として peewee 3.15.4 を使用しています。
import os
import uuid
from typing import List
from peewee import *
from playhouse.db_url import connect
db = connect('mysql://root:@127.0.0.1:4000/test')
class Player(Model):
id = CharField(max_length=36, primary_key=True)
coins = IntegerField()
goods = IntegerField()
class Meta:
database = db
table_name = "player"
def random_player(amount: int) -> List[Player]:
players = []
for _ in range(amount):
players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
return players
def simple_example() -> None:
# create a player, who has a coin and a goods.
Player.create(id="test", coins=1, goods=1)
# get this player, and print it.
test_player = Player.select().where(Player.id == "test").get()
print(f'id:{test_player.id}, coins:{test_player.coins}, goods:{test_player.goods}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
Player.bulk_create(player_list, 114)
# print the number of players
count = Player.select().count()
print(f'number of players: {count}')
# print 3 players.
three_players = Player.select().limit(3)
for player in three_players:
print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
def trade_check(sell_id: str, buy_id: str, amount: int, price: int) -> bool:
sell_goods = Player.select(Player.goods).where(Player.id == sell_id).get().goods
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
buy_coins = Player.select(Player.coins).where(Player.id == buy_id).get().coins
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
return True
def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
with db.atomic() as txn:
try:
if trade_check(sell_id, buy_id, amount, price) is False:
txn.rollback()
return
# deduct the goods of seller, and raise his/her the coins
Player.update(goods=Player.goods - amount, coins=Player.coins + price).where(Player.id == sell_id).execute()
# deduct the coins of buyer, and raise his/her the goods
Player.update(goods=Player.goods + amount, coins=Player.coins - price).where(Player.id == buy_id).execute()
except Exception as err:
txn.rollback()
print(f'something went wrong: {err}')
else:
txn.commit()
print("trade success")
def trade_example() -> None:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
Player.create(id="1", coins=100, goods=0)
Player.create(id="2", coins=114514, goods=20)
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
after_trade_players = Player.select().where(Player.id.in_(["1", "2"]))
for player in after_trade_players:
print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
db.connect()
# recreate the player table
db.drop_tables([Player])
db.create_tables([Player])
simple_example()
trade_example()
ドライバーを直接使用する場合と比較して、peewee は、データベース接続を作成するときに、さまざまなデータベースの特定の詳細を抽象化します。さらに、peewee は、セッション管理や基本オブジェクトの CRUD などの一部の操作をカプセル化するため、コードが大幅に簡素化されます。
Player
のクラスは、アプリケーション内の属性へのテーブルのマッピングです。 Player
の各属性は、 player
テーブルのフィールドに対応します。 SQLAlchemy に詳細情報を提供するために、属性はid = Column(String(36), primary_key=True)
として定義され、フィールド タイプとその追加属性を示します。たとえば、 id = Column(String(36), primary_key=True)
id
属性がString
型であること、データベース内の対応するフィールドがVARCHAR
型であること、長さが36
であること、および主キーであることを示します。
peewee の使用方法の詳細については、 ピーウィーのドキュメントを参照してください。
mysql クライアントは、人気のある Python 用のオープンソース ドライバーです。以下では、例として mysqlclient 2.1.1 を使用します。 Python 用のドライバーは、他の言語よりも使いやすいですが、基盤となる実装を保護せず、トランザクションを手動で管理する必要があります。 SQL が必要なシナリオがあまりない場合は、プログラムの結合を減らすのに役立つ ORM を使用することをお勧めします。
import uuid
from typing import List
import MySQLdb
from MySQLdb import Connection
from MySQLdb.cursors import Cursor
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="127.0.0.1",
port=4000,
user="root",
password="",
database="test",
autocommit=autocommit
)
def create_player(cursor: Cursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: Cursor, player_id: str) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: Cursor, limit: int) -> List[tuple]:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((uuid.uuid4(), 10000, 10000))
return players
def bulk_create_player(cursor: Cursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: Cursor) -> None:
cursor.execute("SELECT count(*) FROM player")
return cursor.fetchone()[0]
def trade_check(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
_, sell_goods = cursor.fetchone()
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buy_coins, _ = cursor.fetchone()
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as conn:
with conn.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
def trade_example() -> None:
with get_connection(autocommit=False) as conn:
with conn.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
conn.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with conn.cursor() as cur:
_, player1_coin, player1_goods = get_player(cur, "1")
print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
_, player2_coin, player2_goods = get_player(cur, "2")
print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
simple_example()
trade_example()
ドライバーは ORM よりもカプセル化のレベルが低いため、プログラムには多くの SQL ステートメントがあります。 ORM とは異なり、ドライバーにはデータ オブジェクトがないため、ドライバーによってクエリされたPlayer
タプルとして表されます。
mysqlclient の使用方法の詳細については、 mysqlclient ドキュメントを参照してください。
PyMySQLは、人気のある Python 用のオープンソース ドライバーです。以下では、例として PyMySQL 1.0.2 を使用します。 Python 用のドライバーは、他の言語よりも使いやすいですが、基盤となる実装を保護せず、トランザクションを手動で管理する必要があります。 SQL が必要なシナリオがあまりない場合は、プログラムの結合を減らすのに役立つ ORM を使用することをお勧めします。
import uuid
from typing import List
import pymysql.cursors
from pymysql import Connection
from pymysql.cursors import DictCursor
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test',
cursorclass=DictCursor,
autocommit=autocommit)
def create_player(cursor: DictCursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: DictCursor, player_id: str) -> dict:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: DictCursor, limit: int) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((uuid.uuid4(), 10000, 10000))
return players
def bulk_create_player(cursor: DictCursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: DictCursor) -> int:
cursor.execute("SELECT count(*) as count FROM player")
return cursor.fetchone()['count']
def trade_check(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
seller = cursor.fetchone()
if seller['goods'] < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buyer = cursor.fetchone()
if buyer['coins'] < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as connection:
with connection.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(test_player)
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(player)
def trade_example() -> None:
with get_connection(autocommit=False) as connection:
with connection.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
connection.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(connection, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(connection, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with connection.cursor() as cur:
print(get_player(cur, "1"))
print(get_player(cur, "2"))
simple_example()
trade_example()
ドライバーは ORM よりもカプセル化のレベルが低いため、プログラムには多くの SQL ステートメントがあります。 ORM とは異なり、ドライバーにはデータ オブジェクトがないため、ドライバーによってクエリされるPlayer
ディクショナリとして表されます。
PyMySQL の使用方法の詳細については、 PyMySQL ドキュメントを参照してください。
mysql-コネクタ-pythonは、人気のある Python 用のオープンソース ドライバーです。以下では、例として mysql-connector-python 8.0.31 を使用しています。 Python 用のドライバーは、他の言語よりも使いやすいですが、基盤となる実装を保護せず、トランザクションを手動で管理する必要があります。 SQL が必要なシナリオがあまりない場合は、プログラムの結合を減らすのに役立つ ORM を使用することをお勧めします。
import uuid
from typing import List
from mysql.connector import connect, MySQLConnection
from mysql.connector.cursor import MySQLCursor
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test')
connection.autocommit = autocommit
return connection
def create_player(cursor: MySQLCursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: MySQLCursor, player_id: str) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: MySQLCursor, limit: int) -> List[tuple]:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((str(uuid.uuid4()), 10000, 10000))
return players
def bulk_create_player(cursor: MySQLCursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: MySQLCursor) -> int:
cursor.execute("SELECT count(*) FROM player")
return cursor.fetchone()[0]
def trade_check(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
_, sell_goods = cursor.fetchone()
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buy_coins, _ = cursor.fetchone()
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: MySQLConnection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as connection:
with connection.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
def trade_example() -> None:
with get_connection(autocommit=False) as conn:
with conn.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
conn.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with conn.cursor() as cur:
_, player1_coin, player1_goods = get_player(cur, "1")
print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
_, player2_coin, player2_goods = get_player(cur, "2")
print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
simple_example()
trade_example()
ドライバーは ORM よりもカプセル化のレベルが低いため、プログラムには多くの SQL ステートメントがあります。 ORM とは異なり、ドライバーにはデータ オブジェクトがないため、ドライバーによってクエリされたPlayer
タプルとして表されます。
mysql-connector-python の使用方法の詳細については、 mysql-connector-python ドキュメントを参照してください。
ステップ 3. コードを実行する
次のコンテンツでは、コードを実行する方法を順を追って紹介します。
ステップ 3.1 テーブルの初期化
コードを実行する前に、テーブルを手動で初期化する必要があります。ローカルの TiDB クラスターを使用している場合は、次のコマンドを実行できます。
- MySQL CLI
- MyCLI
mysql --host 127.0.0.1 --port 4000 -u root < player_init.sql
mycli --host 127.0.0.1 --port 4000 -u root --no-warn < player_init.sql
ローカル クラスターを使用していない場合、または MySQL クライアントをインストールしていない場合は、任意の方法 (Navicat、DBeaver、またはその他の GUI ツールなど) を使用してクラスターに接続し、 player_init.sql
のファイルで SQL ステートメントを実行します。
ステップ 3.2 TiDB Cloudのパラメーターを変更する
TiDB Cloud Serverless Tierクラスターを使用している場合は、CA ルート パスを指定し、次の例の<ca_path>
を CA パスに置き換える必要があります。システムの CA ルート パスを取得するには、 システムの CA ルート パスはどこにありますか?を参照してください。
- SQLAlchemy (Recommended)
- peewee (Recommended)
- mysqlclient
- PyMySQL
- mysql-connector-python
TiDB Cloud Serverless Tierクラスターを使用している場合は、 sqlalchemy_example.py
のcreate_engine
関数のパラメーターを変更します。
engine = create_engine('mysql://root:@127.0.0.1:4000/test')
設定したパスワードが123456
で、クラスターの詳細ページから取得した接続パラメーターが次のとおりであるとします。
- エンドポイント:
xxx.tidbcloud.com
- ポート:
4000
- ユーザー:
2aEp24QWEDLqRFs.root
この場合、次のようにcreate_engine
を変更できます。
engine = create_engine('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', connect_args={
"ssl_mode": "VERIFY_IDENTITY",
"ssl": {
"ca": "<ca_path>"
}
})
TiDB Cloud Serverless Tierクラスターを使用している場合は、 sqlalchemy_example.py
のcreate_engine
関数のパラメーターを変更します。
db = connect('mysql://root:@127.0.0.1:4000/test')
設定したパスワードが123456
で、クラスターの詳細ページから取得した接続パラメーターが次のとおりであるとします。
- エンドポイント:
xxx.tidbcloud.com
- ポート:
4000
- ユーザー:
2aEp24QWEDLqRFs.root
この場合、次のようにconnect
を変更できます。
peewee が PyMySQL をドライバーとして使用する場合:
db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', ssl_verify_cert=True, ssl_ca="<ca_path>")peewee がドライバーとして mysqlclient を使用する場合:
db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', ssl_mode="VERIFY_IDENTITY", ssl={"ca": "<ca_path>"})
peewee はドライバーにパラメーターを渡すため、peewee を使用する場合はドライバーの使用タイプに注意する必要があります。
TiDB Cloud Serverless Tierクラスターを使用している場合は、 get_connection
関数をmysqlclient_example.py
に変更します。
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="127.0.0.1",
port=4000,
user="root",
password="",
database="test",
autocommit=autocommit
)
設定したパスワードが123456
で、クラスターの詳細ページから取得した接続パラメーターが次のとおりであるとします。
- エンドポイント:
xxx.tidbcloud.com
- ポート:
4000
- ユーザー:
2aEp24QWEDLqRFs.root
この場合、次のようにget_connection
を変更できます。
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="xxx.tidbcloud.com",
port=4000,
user="2aEp24QWEDLqRFs.root",
password="123456",
database="test",
autocommit=autocommit,
ssl_mode="VERIFY_IDENTITY",
ssl={
"ca": "<ca_path>"
}
)
TiDB Cloud Serverless Tierクラスターを使用している場合は、 get_connection
関数をpymysql_example.py
に変更します。
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test',
cursorclass=DictCursor,
autocommit=autocommit)
設定したパスワードが123456
で、クラスターの詳細ページから取得した接続パラメーターが次のとおりであるとします。
- エンドポイント:
xxx.tidbcloud.com
- ポート:
4000
- ユーザー:
2aEp24QWEDLqRFs.root
この場合、次のようにget_connection
を変更できます。
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='xxx.tidbcloud.com',
port=4000,
user='2aEp24QWEDLqRFs.root',
password='123546',
database='test',
cursorclass=DictCursor,
autocommit=autocommit,
ssl_ca='<ca_path>',
ssl_verify_cert=True,
ssl_verify_identity=True)
TiDB Cloud Serverless Tierクラスターを使用している場合は、 get_connection
関数をmysql_connector_python_example.py
に変更します。
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test')
connection.autocommit = autocommit
return connection
設定したパスワードが123456
で、クラスターの詳細ページから取得した接続パラメーターが次のとおりであるとします。
- エンドポイント:
xxx.tidbcloud.com
- ポート:
4000
- ユーザー:
2aEp24QWEDLqRFs.root
この場合、次のようにget_connection
を変更できます。
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(
host="xxx.tidbcloud.com",
port=4000,
user="2aEp24QWEDLqRFs.root",
password="123456",
database="test",
autocommit=autocommit,
ssl_ca='<ca_path>',
ssl_verify_identity=True
)
connection.autocommit = autocommit
return connection
ステップ 3.3 コードを実行する
コードを実行する前に、次のコマンドを使用して依存関係をインストールします。
pip3 install -r requirement.txt
スクリプトを複数回実行する必要がある場合は、 テーブルの初期化セクションに従って、各実行前にテーブルを再度初期化します。
- SQLAlchemy (Recommended)
- peewee (Recommended)
- mysqlclient
- PyMySQL
- mysql-connector-python
python3 sqlalchemy_example.py
python3 peewee_example.py
python3 mysqlclient_example.py
python3 pymysql_example.py
python3 mysql_connector_python_example.py
ステップ 4. 期待される出力
- SQLAlchemy (Recommended)
- peewee (Recommended)
- mysqlclient
- PyMySQL
- mysql-connector-python