Параллельные вычисления#

Параллельные вычисления — это метод организации вычислений, при котором одна большая задача разбивается на несколько меньших подзадач, выполняющихся одновременно на нескольких процессорах, ядрах или машинах. В отличие от последовательных вычислений, параллельные позволяют задействовать несколько вычислительных ресурсов одновременно, что кратно сокращает общее время обработки данных.

Такие вычисления обычно организуются через пулы потоков, которые управляют параллельным выполнением задач.

Такой подход эффективен при:

  • обработке больших массивов данных (Big Data);

  • математическом моделировании и симуляциях;

  • пакетной обработке транзакций;

  • расчете сложных аналитических отчетов;

  • выполнении однотипных операций над множеством независимых элементов.

Общий алгоритм параллельных вычислений в основной сессии#

  1. Создать пул потоков. Основная характеристика пула — максимальное количество одновременно выполняемых задач.

  2. Запросить данные для расчетов.

  3. Разбить данные на пачки и сформировать задания для параллельного выполнения.

  4. Направить задания в пул потоков. Задания выполняются параллельно.

Планирование допустимого количества параллельных задач#

  1. Пусть S — количество доступных активных сессий базы данных.
    S можно приблизительно рассчитать как количество ядер доступных postgresql × 2.

  2. Пусть С — количество ядер, доступных серверу приложения.

  3. Рассчитать количество потоков как:

     Наименьшее(С*2,S*2).
    

Примечание

Данный алгоритм дает приблизительную оценку, так как оптимальное количество потоков зависит от соотношения нагрузки на диски и процессоры, а также от административных квот на оборудование.

Инструменты для параллельных вычислений#

  • Метод получения пула для параллельных вычислений — ru.bitec.app.gtk.eclipse.parallel.Parallel.withPool.
    Доступен в контексте прикладной сессии (api, pkg, avi).

  • Запрос с сохранением результата в файл — ASQL"""select...""".withTempFileAs.
    Позволяет минимизировать потребление памяти и сессий базы данных во время длительных вычислений.
    Доступен в контексте прикладной сессии.

  • Отображение сообщения с индикацией расчета — dialogs.withInfoForm.
    Доступен в отображениях.

  • Обновление сообщения с индикацией расчета — dialogs.showInfoForm.
    Доступен в отображениях.

Совет

Дополнительную информацию смотрите в документации методов.

Шаблон параллельных вычислений#

dialogs.withInfoForm("Подготовка данных для расчета") {
    //Выполнить параллельные вычисления в 16 потоках
    Parallel.withPool(16) { pool =>
        //Размер пачки
        val batchSize = 500
        //Пачка для обработки
        val batch = ArrayBuffer.empty[NLong]
        //Количество обработанных записей
        var executedSize = 0

        //Отправка пачки на выполнение
        def submitBatch(): Unit= {
            val curSize = batch.size
            pool.submit(batch.toSet) { implicit session =>
                ids =>
                //ВНИМАНИЕ:
                //В процедуре вычисления не доступны данные из основной сессии
                //Поэтому работа может идти только с данными переданными
                //В процедуру submit, и полученными в текущие замыкание
                //в данном примере с именем ids
                //Передать можно только данные которые можно безопасно сериализовать
                //Попытка сослаться на другие данные приведет к ошибке в момент выполнения.

                //Массовая загрузка rop
                for (r <- Btk_QueryPkg().largeInQuery(SomeApi(), "id", ids)) {
                    //Выполнение вычислений
                }
            }.onSuccess { r =>
                //Обработка результатов выполнения идет в основном потоке
                //поэтому здесь доступно использование любых переменных
                
                //Расчет общего количества посчитанных данных
                executedSize = executedSize + curSize
                //Обновление диалога прогресса
                dialogs.showInfoForm(s"Выполнение расчетов ${executedSize}")
            }
            batch.clear()
        }
        
        //В данном примере запрос с большими данными для вычисления сохраняется в файл
        //Это позволяет сократить затраты по оперативной памяти и используемым
        //сессиям базы данных в момент расчет
        //Возможно работа с несколькими полями for(id~name<-...withTempFileAs(nLong("id")~nStr("id")) )
        //Для этого необходимо сделать import anorm._
        for (id <-
                ASQL"""
                SELECT t.id  FROM SomeTable t
                """.withTempFileAs(nLong("id"))
        ) {
            //Формирование пачек на параллельное выполнение
            batch += id
            if (batch.size >= batchSize) {
                submitBatch()
            }
        }
        //Вычисление оставшихся данных
        if (batch.nonEmpty) {
            submitBatch()
        }
    }
}