Обработка таблицы множеством процессов

Есть таблица с парой миллионов записей.

Несколько процессов должны обработать все записи этой таблицы.
Один процесс должен обрабатывать 10000 записей за сеанс.
Обработав одну партию записей каждый процесс должен обрабатывать следубщие (не обработанные) 10000 записей.

Как реализовать такую обработку с помощью функций?

Спасибо

Опции просмотра комментариев

Выберите предпочитаемый вами способ показа комментариев и нажмите "Сохранить настройки" для активации изменений.

Объясните

Объясните пожалуйста смысл такой обработки? Вы хотите сделать это с максимальным быстродействием? Тогда один процесс на всю таблицу сделает это быстрее, чем множество процессов по 10 000 записей. Цель какая-то другая?

Цель обработки

Процессы, которые будут обрабатывать таблицу, вызываются несколькими AppServer'ми(Читают данные и обнавляют).
AppServer'ы различной мощности, поэтому и быстродействие обработки данных разная. По этой причине AppServer'ы должны получать данные порционно. "Сильный" сервер обработает больше порций, слабый - меньше итд.

Понятно. В

Понятно. В общем способов решения может быть много.
Например:
Заводим в таблице поле, которое указывает на то, что оно уже обработано
1. Лочим таблицу
2. Забираем оттуда 10 000 записей
3. Обрабатываем записи
4. Выставляем флаг обработанности
5. Разлочиваем таблицу

Если обработка длится довольно долго, то такой способ не очень-то хорош.
Тогда лучше создать временную таблицу, куда при залоченной 1-й таблице слить 10 000 записей, удалить их в основной таблице и разлочить её. Далее спокойно обработать записи во временной таблице и залить обработанные записи обратно. Способ тоже не без недостатков, но как ещё обеспечить параллелизацию я не вижу. Транзакции здесь не помогут, потому что пока транзакция не закончится, её результаты другие не увидят и возможно засасывание записей по новой.

Можно конечно использовать транзакции и обрабатывать записи по одной, но боюсь это будет слишком медлено, хотя вам виднее - я ничего не знаю о ваших данных и обработке.

А может Select.. for update

А если попробовать с Select.. for update? Как тогда бы это выглядело?

В доке

В доке написано:

FOR UPDATE causes the rows retrieved by the SELECT statement to be locked as though for update. This prevents them from being modified or deleted by other transactions until the current transaction ends. That is, other transactions that attempt UPDATE, DELETE, or SELECT FOR UPDATE of these rows will be blocked until the current transaction ends. Also, if an UPDATE, DELETE, or SELECT FOR UPDATE from another transaction has already locked a selected row or rows, SELECT FOR UPDATE will wait for the other transaction to complete, and will then lock and return the updated row (or no row, if the row was deleted

Таким образом, транзакции, которые попытаются обратится к заблокрованным записям для обновления или SELECT FOR UPDATE, будут ТОЖЕ заблокированы до окончания первой транзакции. Что же в этом хорошего будет?

Ну а если

Ну а если сделать таблицу условной блокировки

 virtual_lock(FromID int, ToID int,ServerName text) 

и функцию выдачи свободного диапазона

 GetFreeBlock(in_BlockSize int, out_FromID int, out_ToID) returns boolean 

Когда сервер хочет начать обработку записей, он запрашивает у функции диапазон свободных записей. Функция смотрит в целевую таблицу, исключая из нее заблокированные диапазоны из virtual_lock, добавляет запись в таблицу virtual_lock и выдает свободный диапазон серверу.

Функция даже может возвращать сам этот диапазон (на ваш вкус)

 GetFreeBlock(in_BlockSize int) returns setof public.foo

Ну а если два

Ну а если два сервера "одновременно" запросят незаблокированные записи из GetFreeBlock?

например так

1 Сервер 2 Сервер
Найти следующие незаблокированные 10000 записей  
  Найти следующие незаблокированные 10000 записей
Блокировать найденные записи и обработать.  
  Блокировать найденные записи и обработать.

Да, управление

Да, управление асинхронными процессами - это вещь сложно предсказуемая, но бороться с этим можно и нужно.
Попробую изобразить свои соображения на эту тему.
имеем таблицу busy_blocks(FromID, ToID, ServerName,InProcess boolean default true)

функция (синтаксис условный)

GetFreeBlock(in_ServerName, in_BlockSize, out_FromID, out_ToID) returns integer 
AS $$
declare processing BOOLEAN;
begin
INSERT INTO busy_blocks(ServerName) VALUES(in_ServerName); --говорим всем, что мы еще в процессе
--удаляем старые блокировки 
-- (или не надо удалять, в зависимости от алгоритма проверки на наличие свободного блока)
DELETE FROM busy_blocks WHERE "ServerName"=in_ServerName AND InProcess=false; 
SELECT EXISTS(SELECT NULL FROM busy_blocks WHERE "ServerName"!=in_ServerName AND InProcess=true)
        INTO processing;
IF processing then 
   RETURN 1;--значит нужно подождать
   DELETE FROM busy_blocks WHERE "ServerName"=in_ServerName AND InProcess=true;
end IF;
 
-- дальше проверяем наличие записей, 
IF  [если все записи обработаны]  then
   RETURN 1;
   DELETE FROM busy_blocks WHERE "ServerName"=in_ServerName AND InProcess=true;
else -- если есть свободный блок
   out_FromID:=[начало блока];
   out_ToID:=[конец блока];
   UPDATE busy_blocks SET FromID=out_FromID,  ToID=out_ToID, ,InProcess=false 
             WHERE "ServerName"=in_ServerName AND InProcess=true;
   RETURN 0;
end IF;
 
end;
$$

ну а вызов будет такой (тоже условный синтаксис)

.......
while true {
res=GetFreeBlock('Serv1',10000, intFromID,intToID);
case res 
   {
   2: continue; //продолжаем крутить цикл
   1: break; //обработали всю таблицу, выходим
   0: ProcessBlock(intFromID,intToID) --запускаем процедуру обработки блока
   }
}
......

Я только не пойму, почему нельзя сделать эту обработку прямо на сервере постгреса, всяко будет быстрее, чем слать по сети кучу записей...

Опции просмотра комментариев

Выберите предпочитаемый вами способ показа комментариев и нажмите "Сохранить настройки" для активации изменений.

Back to top

(С) Виктор Вислобоков, 2008-2023