Параллельные вычисления#
Параллельные вычисления — это метод организации вычислений, при котором одна большая задача разбивается на несколько меньших подзадач, выполняющихся одновременно на нескольких процессорах, ядрах или машинах. В отличие от последовательных вычислений, параллельные позволяют задействовать несколько вычислительных ресурсов одновременно, что кратно сокращает общее время обработки данных.
Такие вычисления обычно организуются через пулы потоков, которые управляют параллельным выполнением задач.
Такой подход эффективен при:
обработке больших массивов данных (Big Data);
математическом моделировании и симуляциях;
пакетной обработке транзакций;
расчете сложных аналитических отчетов;
выполнении однотипных операций над множеством независимых элементов.
Общий алгоритм параллельных вычислений в основной сессии#
Создать пул потоков. Основная характеристика пула — максимальное количество одновременно выполняемых задач.
Запросить данные для расчетов.
Разбить данные на пачки и сформировать задания для параллельного выполнения.
Направить задания в пул потоков. Задания выполняются параллельно.
Планирование допустимого количества параллельных задач#
Пусть S — количество доступных активных сессий базы данных.
S можно приблизительно рассчитать какколичество ядер доступных postgresql× 2.Пусть С — количество ядер, доступных серверу приложения.
Рассчитать количество потоков как:
Наименьшее(С*2,S*2).
Внимание
Параллельные потоки работают на одном сервере приложений, где запущен текущий рабочий сеанс. Таким образом не следует запускать алгоритмы, обсчитывающие весь филиал или огромный объем, т.к. сервер приложений может быть перегружен и ему может не хватить оперативной памяти для выполнения задачи.
Примечание
Данный алгоритм дает приблизительную оценку, так как оптимальное количество потоков зависит от соотношения нагрузки на диски и процессоры, а также от административных квот на оборудование.
Инструменты для параллельных вычислений#
Метод получения пула для параллельных вычислений —
ru.bitec.app.gtk.eclipse.parallel.Parallel.withPool.
Доступен в контексте прикладной сессии (api,pkg,avi).Запрос с сохранением результата в файл —
ASQL"""select...""".withTempFileAs.
Позволяет минимизировать потребление памяти и сессий базы данных во время длительных вычислений.
Доступен в контексте прикладной сессии.Отображение сообщения с индикацией расчета —
dialogs.withInfoForm.
Доступен в отображениях.Обновление сообщения с индикацией расчета —
dialogs.showInfoForm.
Доступен в отображениях.
Совет
Дополнительную информацию смотрите в документации методов.
Шаблон параллельных вычислений#
dialogs.withInfoForm("Подготовка данных для расчета") {
//Выполнить параллельные вычисления в 16 потоках
Parallel.withPool(16) { pool =>
//Размер пачки
val batchSize = 500
//Пачка для обработки
val batch = ArrayBuffer.empty[NLong]
//Количество обработанных записей
var executedSize = 0
//Отправка пачки на выполнение
def submitBatch(): Unit= {
val curSize = batch.size
pool.submit(batch.toSet) { implicit session =>
ids =>
//ВНИМАНИЕ:
//В процедуре вычисления не доступны данные из основной сессии
//Поэтому работа может идти только с данными переданными
//В процедуру submit, и полученными в текущие замыкание
//в данном примере с именем ids
//Передать можно только данные которые можно безопасно сериализовать
//Попытка сослаться на другие данные приведет к ошибке в момент выполнения.
//Массовая загрузка rop
for (r <- Btk_QueryPkg().largeInQuery(SomeApi(), "id", ids)) {
//Выполнение вычислений
}
}.onSuccess { r =>
//Обработка результатов выполнения идет в основном потоке
//поэтому здесь доступно использование любых переменных
//Расчет общего количества посчитанных данных
executedSize = executedSize + curSize
//Обновление диалога прогресса
dialogs.showInfoForm(s"Выполнение расчетов ${executedSize}")
}
batch.clear()
}
//В данном примере запрос с большими данными для вычисления сохраняется в файл
//Это позволяет сократить затраты по оперативной памяти и используемым
//сессиям базы данных в момент расчет
//Возможно работа с несколькими полями for(id~name<-...withTempFileAs(nLong("id")~nStr("id")) )
//Для этого необходимо сделать import anorm._
for (id <-
ASQL"""
SELECT t.id FROM SomeTable t
""".withTempFileAs(nLong("id"))
) {
//Формирование пачек на параллельное выполнение
batch += id
if (batch.size >= batchSize) {
submitBatch()
}
}
//Вычисление оставшихся данных
if (batch.nonEmpty) {
submitBatch()
}
}
}
Примечание
Выполнение onSuccess либо на выходе из метода Parallel.withPool либо на вызове pool.submit
Обработка ошибок в Parallel#
Обработка ошибок параллельных потоков происходит в .onFailure(ex => …)
Стандартное поведение бросает ошибку в главном потоке .onFailure(ex => throw ex)
Если в главном или параллельном потоке упадет ошибка
и не будет перехвачена в onFailure то Parallel.withPool
дождется выполнения уже созданных задач, но не будет начинать новые
Примечание
Выполнение onFailure либо на выходе из метода Parallel.withPool либо на вызове pool.submit
Parallel.withPool(3) { pool =>
// Эта задача выполнится
pool.submit(1) { implicit session => arg =>
//Обычная задача (1)
}
pool.submit(1) { implicit session => arg =>
// Задача, которая выкинет ошибку (2)
}
// Эта задача либо начнется, либо выкинет ошибку из задачи 2
// Зависит успеет ли задача 2 выкинуть ошибку к моменту запуска
pool.submit(1) { implicit session => arg =>
// Обычная задача (3)
}
// В конце дождется выполнения задачи 1 и 3 (если та успела запустится)
// И выкинет первую ошибку полученную из параллельного потока (в нашем случае из задачи 2)
}