Inteligentna szachownica na Raspberry Pi 5 z wykorzystaniem kontaktronów oraz magnesów przymocowanych do bierek, za pomocą których, wykrywane jest położenie figur na planszy. Program następnie analizuje dostępne warianty możliwych do wykonania ruchów oraz pokazuje zalecany ruch w danym momencie na wyświetlaczu LCD oraz na lichess.org. Projekt zaliczeniowy na Systemy Wbudowane.
Celem projektu było stworzenie inteligentnej szachownicy, która jest w stanie, analizować obecną pozycję figur na planszy. Szachownica wspiera gracza przy wyborze optymalnych ruchów i wyborze odpowiedniej strategii w celu pokonania drugiego gracza. Dodatkowym atutem jest możliwość podłączenia urządzenia do monitora i wyświetlenia aktualnego stanu szachownicy za pomocą internetowej platformy lichess.org. Wykrywanie pozycji bierek jest możliwe dzięki wykorzystaniu 64 kontaktronów, które znajdują się pod każdym polem na planszy. Do każdej figury przymocowany jest magnes neodymowy, dzięki czemu po zbliżeniu go na odpowiednią odległość, możemy wykryć, że dane pole jest okupowane przez daną bierkę. Kontaktrony zostały połączone w macierz za pomocą pięciu multiplekserów wejścia, które są podłączone do multipleksera I2C. Komponent ten łączy odbiera sygnał ze wszystkich pól szachownicy, a następnie przekazuje je do Raspberry Pi 5, która z wykorzystaniem języka python analizuje otrzymane dane. Wykorzystane biblioteki: board, adafruit_tca9548a, time, busio, digitalio, adafruit_mcp230xx, socket, adafruit_ssd1306, PIL, chess, selenium, stockfish. Program podzielony jest na dwie części. Pierwsza, przetwarza obecną sytuację na planszy, iterując po wszystkich polach na planszy i wysyła wykryte zmiany za pomocą protokołu TCP do drugiego procesu, który z wykorzystaniem darmowego silnika szachowego Stockfish analizuje dostępne warianty możliwych do wykonania ruchów i zwraca zalecany ruch w danym momencie. Informacja ta wysyłana jest ponownie (z wykorzystaniem TCP) do pierwszego procesu, który otrzymane dane pokazuje na wyświetlaczu lcd, który również połączony jest do multipleksera I2C. Całość została zamknięta w wykonanej z drewna skrzyni, z której wychodzą 4 przewody: SCK, SDA oraz wspólna masa, które następnie można podłączyć do Raspberry Pi 5 oraz przewód zasilający USB.
Wykonany projekt spełnia wszystkie główne założenia projektowe. Stworzona szachownica pozwala na wykrycie stanu bierek na planszy i pomaga w wyborze optymalnego ruchu w danym momencie. Dzięki wykorzystaniu multipleksera I2C udało się wzbogacić projekt o dodatkowy wyświetlacz, dzięki czemu obu graczy ma możliwość w prosty sposób sprawdzić podpowiedzi ze strony Stockfisha. Różnicą względem pierwotnych założeń jest wykorzystanie pięciu (zamiast początkowych czterech) multiplekserów wejścia, co było spowodowane otrzymaniem uszkodzonych komponentów.
Zdjęcia
Schemat ideowy
Schemat połączeniowy
Kod
https://github.com/KamieniarzJakub/Smart-Chessboard
Kod, który został uruchomiony na Raspberry Pi został napisany przez moich kolegów. Istotne pliki to szachy.py
oraz chessboard.py
na branchu rpi1
.
Część 1 - odczyt oraz wyświetlanie informacji o zalecanym ruchu
import board
import adafruit_tca9548a
import time
import board
import busio
from digitalio import Direction, Pull
from adafruit_mcp230xx.mcp23017 import MCP23017
import socket
from board import SCL, SDA, MISO, MOSI
import busio
import adafruit_ssd1306
from PIL import Image, ImageDraw, ImageFont
# Załadowanie czcionki
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
display_pins = [4, 5] # Definicja pinów dla wyświetlaczy LED
# Słownik przechowujący konfigurację kanałów i odpowiadających im pinów
pola = {
0: {
# Wykluczone: 0, 4, 11, 13, 15
1: ["C6"],
2: ["C7"],
3: ["C8"],
5: ["D6"],
6: ["D7"],
7: ["D8"],
8: ["B8"],
9: ["B7"],
10: ["B6"],
12: ["A8"],
14: ["A6"]
},
1: {
# Wykluczone: 2, 8
0: ["G5"],
1: ["G6"],
3: ["G8"],
4: ["H5"],
5: ["H6"],
6: ["H7"],
7: ["H8"],
9: ["F7"],
10: ["F6"],
11: ["F5"],
12: ["E8"],
13: ["E7"],
14: ["E6"],
15: ["E5"]
},
2: {
# Wszystkie piny
0: ["G4"],
1: ["G3"],
2: ["G2"],
3: ["G1"],
4: ["H4"],
5: ["H3"],
6: ["H2"],
7: ["H1"],
8: ["F1"],
9: ["F2"],
10: ["F3"],
11: ["F4"],
12: ["E1"],
13: ["E2"],
14: ["E3"],
15: ["E4"]
},
6: {
# Wykluczone: 3, 4
0: ["C4"],
1: ["C3"],
2: ["C2"],
5: ["D3"],
6: ["D2"],
7: ["D1"],
8: ["B1"],
9: ["B2"],
10: ["B3"],
11: ["B4"],
12: ["A4"],
13: ["A3"],
14: ["A2"],
15: ["A1"]
},
7: {
# Wykluczone: 1 oraz 6-11
0: ["G7"],
2: ["B5"],
3: ["A5"],
4: ["A7"],
5: ["D5"],
12: ["C5"],
13: ["F8"],
14: ["C1"],
15: ["D4"]
}
}
# Funkcja inicjalizująca układ MCP23017 dla danego kanału
def setup_mcp(channel):
# Inicjalizacja magistrali I2C
i2c = busio.I2C(board.SCL, board.SDA)
# Inicjalizacja układu MCP23017
mcp = MCP23017(i2c)
# Konfiguracja pinów jako wejścia z podciągnięciem do VCC
for pin in pola[channel].keys():
pola[channel][pin].append(mcp.get_pin(pin))
pola[channel][pin][-1].direction = Direction.INPUT
pola[channel][pin][-1].pull = Pull.UP
# Funkcja konfigurująca wyświetlacz OLED
def setup_led():
i2c = busio.I2C(board.SCL, board.SDA)
display = adafruit_ssd1306.SSD1306_I2C(128, 32, i2c)
return display
# Funkcja do wyświetlania tekstu na wyświetlaczu OLED
def display_text(display, msg):
image = Image.new("1", (128, 32)) # Utworzenie czarno-białego obrazu
draw = ImageDraw.Draw(image)
draw.text((0, 0), msg, font=font, fill=255) # Rysowanie tekstu
display.fill(0) # Czyszczenie wyświetlacza
display.image(image) # Przesłanie obrazu na wyświetlacz
display.show()
# Tablica przechowująca wyświetlacze LED
led_displays = [None, None]
# Funkcja odczytująca stany przycisków dla danego kanału
def read_mcp(channel):
fields = []
for pin in pola[channel].keys():
field, button = pola[channel][pin]
if button.value: # Sprawdzenie, czy przycisk jest wciśnięty
fields.append(field)
return fields
# Konfiguracja serwera sieciowego
host = "127.0.0.1"
port = 8080
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(1)
clientSocket, clientAddress = server.accept() # Oczekiwanie na połączenie klienta
i2c = board.I2C() # Inicjalizacja magistrali I2C
tca = adafruit_tca9548a.TCA9548A(i2c) # Multiplexer TCA9548A
# Inicjalizacja kanałów MCP23017
for channel in pola.keys():
if tca[channel].try_lock():
for address in tca[channel].scan():
if address != 0x70:
setup_mcp(channel)
tca[channel].unlock()
# Inicjalizacja wyświetlaczy LED
for i in display_pins:
while not tca[i].try_lock():
pass
for address in tca[i].scan():
if address != 0x70:
led_displays[i % 2] = setup_led()
display_text(led_displays[i % 2], "Loading...")
tca[i].unlock()
# Wstępna ustalenie wartości początkowych
_output = "111"
count = 0
start = time.time()
while True:
output = []
for channel in pola.keys():
if tca[channel].try_lock():
for address in tca[channel].scan():
if address != 0x70: # Filtracja adresów
for x in read_mcp(channel):
output.append(x)
tca[channel].unlock()
if len(output) > 0:
print(sorted(output), time.time())
output = ",".join(output)
if output != _output:
if count == 1:
clientSocket.sendall((output).encode("utf-8")) # Wysyłanie danych do klienta
count = 0
_output = output
else:
count += 1
else:
count = 0
clientSocket.settimeout(0.1) # Ustawienie timeoutu dla socketu
try:
data = clientSocket.recv(1024).decode("utf-8")
print(data)
for i in display_pins:
if tca[i].try_lock():
for address in tca[i].scan():
if address != 0x70:
display_text(led_displays[i % 2], data) # Wyświetlanie danych
tca[i].unlock()
except socket.timeout:
pass
time.sleep(0.1)
Część 2 - analiza szachownicy oraz wyświetlenie obecnego stanu na Lichess.org
import chess
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from stockfish import Stockfish
import time
# Inicjalizacja Selenium i przeglądarki
service = Service("/usr/bin/chromedriver") # Ścieżka do chromedriver.exe
driver = webdriver.Chrome(service=service)
driver.get("https://lichess.org/editor/")
# Inicjalizacja szachownicy i Stockfisha
board = chess.Board()
stockfish = Stockfish("/usr/games/stockfish")
stockfish.set_depth(10)
stockfish.set_skill_level(20)
# Zmienne globalne do obsługi roszady
global roszadaWhite, roszadaBlack
roszadaWhite = True
roszadaBlack = True
# Funkcja do aktualizacji szachownicy na Lichess
def update_board_in_browser(board):
fen = board.fen().replace(" ", "_")
fen_url = f"https://lichess.org/editor/{fen}"
driver.get(fen_url)
# Funkcja obliczająca ruch użytkownika na podstawie zmiany pozycji
def calculateMove(previousPosition):
global roszadaWhite, roszadaBlack
actualPosition = previousPosition
# Oczekiwanie na zmianę pozycji szachownicy
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in previousPosition:
if element not in actualPosition:
difference = element
time.sleep(0.1)
pionekRuszajacy = difference
print(pionekRuszajacy)
previousPosition = actualPosition
lenPrevious = len(previousPosition)
bicie = False
# Sprawdzanie, czy nastąpiło zbicie
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
if lenPrevious - len(actualPosition) == 1: # Jeśli zmniejszyła się liczba pionków
bicie = True
for element in previousPosition:
if element not in actualPosition:
difference = element
break
elif lenPrevious - len(actualPosition) == -1: # Jeśli zwiększyła się liczba pionków
for element in actualPosition:
if element not in previousPosition:
difference = element
break
else:
time.sleep(0.1)
previousPosition = actualPosition
# Obsługa ruchów związanych ze zbiciem
if bicie == True:
pionekZbity = difference
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in actualPosition:
if element not in previousPosition:
difference = element
time.sleep(0.1)
string = pionekRuszajacy + pionekZbity
return string.lower(), actualPosition
previousPosition = actualPosition
# Obsługa specjalnych przypadków, takich jak roszady
if roszadaWhite == True:
if pionekRuszajacy == 'E1':
if difference == 'G1':
_ruch = difference
# Roszada krótka biała
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in previousPosition:
if element not in actualPosition:
print(f"coś tam{element}")
_difference = element
time.sleep(0.1)
previousPosition = actualPosition
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in actualPosition:
if element not in previousPosition:
print(f"coś tam2{element}")
__difference = element
time.sleep(0.1)
string = pionekRuszajacy + _ruch
print(string.lower())
return string.lower(), actualPosition
elif difference == 'C1':
_ruch = difference
# Roszada długa biała
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in previousPosition:
if element not in actualPosition:
_difference = element
time.sleep(0.1)
previousPosition = actualPosition
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in actualPosition:
if element not in previousPosition:
_difference = element
string = pionekRuszajacy + _ruch
print(string.lower())
return string.lower(), actualPosition
else:
roszadaWhite = False # Wyłączenie możliwości roszady białych
if roszadaBlack == True:
if pionekRuszajacy == 'E8':
if difference == 'G8':
# Roszada krótka czarna
_ruch = difference
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in previousPosition:
if element not in actualPosition:
difference = element
time.sleep(0.1)
previousPosition = actualPosition
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in actualPosition:
if element not in previousPosition:
difference = element
string = pionekRuszajacy + _ruch
print(string.lower())
return string.lower(), actualPosition
elif difference == 'C8':
_ruch = difference
# Roszada długa biaczarnała
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in previousPosition:
if element not in actualPosition:
difference = element
time.sleep(0.1)
previousPosition = actualPosition
while(previousPosition == actualPosition):
message = clientSocket.recv(1024).decode("utf-8")
actualPosition = message.strip().split(',')
for element in actualPosition:
if element not in previousPosition:
difference = element
string = pionekRuszajacy + _ruch
print(string.lower())
return string.lower(), actualPosition
else:
roszadaBlack = False # Wyłączenie możliwości roszady czarnych
# Obsługa promocji danego pionka -> dama
if difference in ('A8', 'B8', 'C8', 'D8', 'E8', 'F8', 'G8', 'H8'):
if stockfish.get_what_is_on_square(pionekRuszajacy.lower()) == 'Piece.WHITE_PAWN':
string = pionekRuszajacy + difference + 'q'
return string.lower, actualPosition
elif difference in ('A1', 'B1', 'C1', 'D1', 'E1', 'F1', 'G1', 'H1'):
if stockfish.get_what_is_on_square(pionekRuszajacy.lower()) == 'Piece.BLACK_PAWN':
string = pionekRuszajacy + difference + 'q'
return string.lower, actualPosition
# Rekurencja w przypadku błędnych ruchów (np. powtarzających się pozycji)
if string[:2] == string[2:]:
return calculateMove(actualPosition)
string = pionekRuszajacy + difference
return string.lower(), actualPosition
# Konfiguracja połączenia sieciowego
import socket
host = "127.0.0.1"
port = 8080
clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientSocket.connect((host, port))
# Aktualizujemy szachownicę przed rozpoczęciem gry
update_board_in_browser(board)
print(board)
print("--------------------")
# Pozycje początkowe figur
basicPosition = [
"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", # Białe figury
"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", # Białe pionki
"A7", "B7", "C7", "D7", "E7", "F7", "G7", "H7", # Czarne pionki
"A8", "B8", "C8", "D8", "E8", "F8", "G8", "H8" # Czarne figury
]
message = clientSocket.recv(1024).decode("utf-8")
message = message.strip().split(',')
previousPosition = message
newPosition = message
print(previousPosition)
# Usuwanie brakujących figur
brakujace = set(basicPosition) - set(newPosition)
print(list(brakujace))
brakujace = list(brakujace)
for brak in brakujace:
square = chess.parse_square(brak.lower())
board.remove_piece_at(square)
# Główna pętla gry
while not board.is_game_over():
try:
# Ustawienie pozycji Stockfisha
stockfish.set_fen_position(board.fen())
update_board_in_browser(board)
# Ocena pozycji i propozycja ruchu Stockfisha
stockfish_move = stockfish.get_top_moves(1)
if not stockfish_move:
print("Stockfish nie zwrócił żadnego ruchu. Gra zakończona.")
break
best_move = stockfish_move[0]["Move"]
print(f"Ruch Stockfisha: {best_move}")
# Wysyłanie najlepszego ruchu do wyświetlenia go na wyświetlaczu
clientSocket.sendall(best_move.encode("utf-8"))
# Wprowadzenie ruchu użytkownika
move, newPosition = calculateMove(previousPosition)
print(move)
print(newPosition)
previousPosition = newPosition
# time.sleep(5)
print(board.legal_moves)
if chess.Move.from_uci(move) in board.legal_moves:
board.push(chess.Move.from_uci(move))
# Aktualizacja szachownicy w przeglądarce po ruchu gracza
update_board_in_browser(board)
else:
# print("Nielegalny ruch. Spróbuj ponownie.")
continue
# Wykonanie ruchu przez Stockfisha
if not board.is_game_over() and best_move in [m.uci() for m in board.legal_moves]:
board.push(chess.Move.from_uci(best_move))
print(f"Stockfish wykonuje ruch: {best_move}")
# Aktualizacja szachownicy w przeglądarce po ruchu Stockfisha
update_board_in_browser(board)
else:
print("Stockfish nie może wykonać ruchu.")
print(board)
print("--------------------")
except Exception as e:
print(f"Błąd: {e}")
break
# Koniec gry
print("Koniec gry.")
print("Wynik:", board.result())
# Zamknięcie przeglądarki
driver.quit()