Skip to content

Система обмена сообщениями

b4w edited this page Aug 7, 2016 · 1 revision

Ахитектура сервера

Самый простой способ организации работы сервера, сделать все "в один поток". FrontEnd - та часть, которая работает с клиентом (сокеты, сервлеты и т.д.) Game Mechanics - вся логика работы нашего приложения, т.е. все взаимосвязи между объектами и их влияние друг на друга. Resource System - часть, работающая с файлами Database - все, что связанно с persistence

однопоточный сервер

Аналогия

аналогия

Улучшение #1

Самый очевидный вариант - на каждое соединение создаем по потоку. Минусы:

  • "Дорого" создавать новый поток.
  • Потоков на всех не хватит

вариант 1

Аналогия #1 аналогия 1

Улучшение #2

Использование пулла потоков. Таким образом работает сервер Jetty. Минусы:

  • Потоков на всех не хванит
  • При полностью занятом пулле - клиент отбрасывается

вариант 2

Аналогия #2 аналогия 2

Улучшение #3

Использовать очередь обработки запросов. Каждый сервис (FronEnd, GameMechanics, DataSase и т.д.) - являются потоком. Для каждого сервиса есть очередь запросов к нему.

Message system

Основная идея (Должно быть по одной коллекции на каждый поток)

  • Один поток кладет сообщение в коллекцию (содать объект типа message и добавить его в коллекцию)
  • Второй поток достает сообщение и исполняет его

Thread-safe коллекции (Необходимо что бы разные потоки обращаясь к коллекции, не поломали ее)

  • Безопасная работа с элементами коллекции
  • Оптимальная работа

Thread-local объекты

  • Объекты, на которые есть ссылки только из одного потока.

Аналогия с системой сообщений

Address и Abonent

Address - класс, объект которого мы будем создавать для каждого нашего сервиса.

public class Address {
	static private AtomicInteger abonentIdCreator = new AtomicInteger();
	final private int abonentId;

	public Address() {
		this.abonentId = abonentCreator.incrementAndGet();
	}

	@Override
	public int hashCode() {
		return abonentId;
	}

	@Override
	public boolean equals(int abonent) {
		return this.abonent == abonent;
	}
}	

public interface Abonent {
	Address getAddress;
}	

Message

public abstract class Msg {
	private final Address from;
	private final Address to;

	public Msg(Address from, Address to) {
		this.from = from;
		this.to = to;
	} 

	protected Address getFrom() {
		return from;
	}

	protected Address getTo(){
		return to;
	}
	
	// exec - список инструкций, которые необходимо выполнить + храним ссылку на того, кому сообщение будет отправленно 
	public abstract void exec(Abonent abonent);
}

Message to DBService

Абстрактный класс сужающий поле действия Message.

public abstract class MsgToDB extends Msg {
	public MsgToDB(Address from, Address to) {
		super(from, to);
	}

	void exec(Abonent abonent) {
		if(abonent instanceOf DBService) {
			exec((DBService) abonent);
		}
	}

	abstract void exec(DBService accountService);
}

Authenticate message

public class MsgAuthenticate extends MsgDB {
	private String name;
	private String password; 
	private String sessionId;

	public MsgAuthenticate(Address from, Address to, String name, String password, String sessionId) {
		super(from, to);
		this.name = name;
		this.password = password;
		this.sessionId = sessionId;
	}

	void exec(DBService dbService) {
		Account result = dbService.auth(name, password);
		Msg back = new MsgIsAuthenticated(getTo(), getFrom(), sessionId, result);
		dbService.getMessageSystem().sendMessage(back);
	}
} 

Иерархия сообщений

MessageSystem

Класс, объект которого будет шарить между собой различные потоки. В этом классе мы должны завести контейнер, в который мы будем складывать сообщения.

// Address - адрес того, кто будет получать
// ConcurrentLinkedQueue - очередь (от абстрактных сообщений Msg) того, кто будет получать 
private final Map<Address, ConcurrentLinkedQueue<Msg>> messages = new HashMap<Address, ConcurrentLinkedQueue<Msg>>();

/*
* Добавление сообщения в карту.
*/
public void sendMessage(Msg message) {
	// находим нужную очередь 
	Queue<Msg> messageQueue = messages.get(message.getTo());
	// добавляем сообщение
	messageQueue.add(message);
}

public void execForAbonent(Abonent abonent) {
	Queue<Msg> messageQueue = messages.get(abonent.getAddress());
	while(!messageQueue.isEmpty()) {
		Msg message = messageQueue.poll();
		message.exec(abonent);
	}
}

Использование MessageSystem

public class Frontend implements Abonent, Runnable {
	private MessageSystem ms;
	// ...

	public Frontend(MessageSystem ms) {
		this.ms = ms;
	}
	// ...

	@Override 
	public void run() {	
		while(true) {
			ms.execForAbonent(this);
			try {
				Thread.sleep(100);
			} catch (...) { ... }
		}
	}
}		

Взаимодействие FrontEnd с AccountService

Разведем работу с пользователями по разным потокам. FrontEnd - поток, который работает с пользователями. DBService - поток, который работает с авторизацией.

  1. FronEnd создает пользовательскую сессию.
  2. FrontEnd возвращает созданную на основе сессии страницу в браузер.
  3. fronEnd запрашивает у DBService данные по авторизации.
  4. Когда данные приходят, frontEnd меняет состояние сессии.

Clone this wiki locally