Страницы

суббота, 29 декабря 2012 г.

Многопоточные операции и компонент Service Broker

Сегодня я буду рассказывать об одной задаче, которую мне приходилось решать. Задача была связана с перекачкой больших объемов данных между двумя серверами.

Давайте представим себе такую жизненную ситуацию. Есть большая таблица (сотни миллионов или миллиарды записей) на сервере источнике. Ее данные надо как-то перекачать в таблицу другого сервера-приемника. Причем структуры двух таблиц различны. После загрузки данных с сервера источника надо провести определенную обработку по преобразованию данных и только после этого следует выполнить вставку в таблицу приемник.

Понятно, что функция openquery, insert через связанный сервер не подойдут для решения задачи по работе с большими объемами данных. В таких случаях следует стремиться к устранению каких-либо операций протоколирования которые могут сопровождать операции переноса данных. Репликация также не поможет, поскольку таблицы имеют совершенно различные структуры.

Можно поступить так. Сперва извлечь данные в текстовый файл, с помощью консольного приложения, которое будет идти в цикле по таблице записывая строки в файл, разделяя столбцы определенным разделителем. Затем с помощью оператора bulk insert можно операцией с минимальным протоколированием скопировать данные из текстового файла в таблицу, структура которой совпадает со структурой таблицы источника. А уже после этого на стороне сервера-приемника разбираться с задачей обработки данных перед финальной операцией вставки в таблицу.

Какие могут быть подводные камни в этом плане. Ну во-первых программа, которая идет в цикле по строкам таблицы источника может работать долго из-за большого объема записей. Дополнительно представим себе, что для базы данных, в которой находится таблица источник используется оптимистичная модель блокирования, то есть снэпшотный уровень изоляции транзакции. Это означает, что возможности выполнять "грязное" чтение может не быть. И читая таблицу в момент, когда ее модифицирует кто-то другой на сервере источнике, мы будет читать снэпшот для модифицируемых записей. Чтение снэпшота медленнее чем чтение исходных записей, поскольку при исполнении запроса серверу приходится прыгать от физической строки через указатель к тому месту, где хранится копия предыдущего значения строки. Из-за этого скорость извлечения заметно замедлится. Более того, если объем читаемых и модифицируемых записей велик, то у сервера источника (по крайней мере в случае Oracle-а) может не хватить ресурсов для хранения всего снэпшота. Тогда запрос может отвалиться с ошибкой и всю работу придется начинать заново, а драгоценное время будет потеряно.

Что же делать? Напрашивается такой выход из сложившейся ситуации. Необходимо извлекать с помощью одного запроса к таблице источнику относительно небольшой объем записей, и записывать их в отдельный текстовый файл. Например, если в таблице есть столбец, по которому таблица секционирована или который является ведущим столбцом в кластерном индексе, то можно организовать цикл по всем значениям, которые может принимать данное поле. А на каждой итерации запускать консольное приложение, которое будет идти в цикле по набору записей, который получается с помощью запроса к таблице на сервере источнике с условием на равенство столбца определенному значению. Это позволит решить проблему, связанную с параллельным доступом к таблице-источнику. Развивая эту идею можно еще и ускорить процесс формирования файлов, за счет распараллелирования задачи. То есть организовать загрузку таким образом, чтобы одновременно работало несколько экземпляров консольного приложения, каждое из которых загружает данные из таблицы для своего значения столбца, который мы используем для дробления таблицы.

Такая организация функционирования загрузки на основе многопоточности позволяет использовать ресурсы процессора полностью и тем самым ускорить загрузку на порядок!

Есть один принципиальный момент. Предположим, что в таблице-источнике T есть столбец K, который мы решили использовать в условии запроса. Пусть, столбец K может принимать много значений, например, 100. Это значит, что должно отработать в общей сложности 100 потоков, а каждый поток будет работать со своим значением столбца K. Мощности процессора может не хватит на одновременную работу всех 100 потоков. Поэтому процедура, которая будет выполнять загрузку, должна запускать некоторое ограниченное число потоков, равное, например, 10. При этом процедура должна непрерывно следить за состоянием потоков. Как только один или несколько из потоков завершат свою работу, на их месте процедурой должны запускаться такое же количество новых потоков, каждый из которых будет выполнять загрузку данных для своего значения столбца K. Благодаря такой организации в каждый момент времени будет работать одинаковое число потоков, процессор будет использовать все свои ресурсы для обслуживания потоков и загрузка будет скоростной и безопасной!

Для реализации описанного выше плана нам необходим механизм, позволяющий кодом на языке Transact-SQL выполнять асинхронный запуск процедуры. Нет сомнения в том, что с этой задачей можно справиться с помощью хранимых процедур на основе clr, запуск асинхронных процессов с помощью sp_OACreate или xp_cmdshell, а также создание и запуск кодом специально подготовленных джобов (sp_start_job). Но на мой взгляд самый удобный и безопасный способ это служба Service Broker, которая появилась в СУБД MS SQL с 2005 года. Эта служба предоставляет богатый и удобный синтаксис, позволяющий решать задачи асинхронного запуска процессов на сервере, без использования какого-либо внешнего кода.

Приступим к разработке. Сформулируем примерное условие. Предположим, что у нас есть таблица на сервере источнике, в которой хранится история изменения атрибута некоторой сущности. Тем самым в таблице есть поле с датой присвоения сущности определенного значения атрибута. Пусть также это поле является первым столбцом в кластерном индексе таблицы на сервере, с которого грузятся данные. На сервере-приемнике есть программный файл, который через командную строку получает дату и выполняет загрузку данных в текстовый файл. Начнем писать код. У нас будут 2 настроечные таблицы. В первой будет храниться общая информация о характере загрузки: имя программного файла, число потоков и пр. Также будет таблица, описывающая состояние потоков, в ней будет по одной строке на каждое значение столбца с датой.
create table dbo.LoaderParameters
(
      WorkerCount int              not null,
      ProgramFile nvarchar ( 255 ) not null,
      DateStart   datetime         not null,
      DateEnd     datetime         not null
) on [PRIMARY]


В поле WorkerCount будет храниться число потоков, которое должно работать одновременно. ProgramFile это полное имя программного файла, который выполняет загрузку за определенный день. В последних двух полях фиксируются даты начала и окончания, то есть эти поля дают диапазон значений, для которых надо загружать данные. Теперь построим таблицу, по которой мы будем отслеживать состояние потоков.

create table dbo.WorkerState
(
      dtDate            datetime          not null,
      FilePath          nvarchar ( 255 )  not null,
      Stat              int               not null,
      RetryAttempts     int               not null,
      CurAttempt        int               not null,
      WorkerLog         nvarchar ( max )  not null,
      constraint PK_WorkerState_dtDate primary key clustered ( dtDate asc )
) on [PRIMARY]


В таблице WorkerState каждая строка ассоциируется с некоторым экземпляром консольного приложения, которое осуществляет закачку данных за определенную дату (поле dtDate), данные складываются в определенный файл (поле FilePath). Причем загрузка может находиться в одном из состояний (значения поля Stat): 0 (загрузка еще не началась, экземпляр программы еще не запущен, но готов к этому), 1 (программа извлекает данные за день dtDate), 2 (загрузка завершилась с ошибкой), 3 (загрузка успешно завершена). Кроме перечисленных полей, есть еще 2 поля, в которых хранится информация о числе повторных попыток запуска программного файла, если загрузку не получилось провести с первого раза из-за возможных ошибок на стороне сервера источника.

Напишем хранимую процедуру, которая должна запускать экземпляр консольного приложения, передавая ему информацию о том, за какую дату производить загрузку данных. Предполагается, что в коде консольного приложения есть код, выполняющий запрос к таблице dbo.WorkerState с условием на переданную дату, для получения информации о том, в какой файл складывать данные.

use Import
go

if sessionproperty ( N'quoted_identifier' ) = 0
      set quoted_identifier on
go

if sessionproperty ( N'ansi_nulls' ) = 0
      set ansi_nulls on
go

if object_id ( N'Import.dbo.LoadOneFile', N'P' ) is null
      exec ( N'create proc dbo.LoadOneFile as return 1' )
go

alter proc dbo.LoadOneFile
(
      @dtDate datetime
)
as
begin
      set nocount, xact_abort on
     
      declare @cmd varchar ( 8000 ), @res int, @WorkerLog nvarchar ( max ), @CurAttempt int, @RetryAttempts int,
            @CurWorkerLog nvarchar ( max )
     
      if object_id ( N'tempdb..#LoaderResult', N'U' ) is not null
            drop table #LoaderResult
      create table #LoaderResult
      (
            Res nvarchar ( max )
      )
     
      begin try
            select @cmd = '"' + cast ( ProgramFile as varchar ( 8000 ) ) + '" ' + convert ( varchar ( 100 ), @dtDate, 112 )
            from dbo.LoaderParameters
           
            select @CurAttempt = CurAttempt, @RetryAttempts = RetryAttempts
            from WorkerState
           
            set @res = 2
            while @CurAttempt <= @RetryAttempts
            begin
                  insert into #LoaderResult ( Res )
                        exec master.dbo.xp_cmdshell @cmd
                 
                  if exists
                  (
                        select top 1 1
                        from #LoaderResult
                        where lower ( Res ) like lower ( N'%work done%' )
                  )
                  begin
                        set @res = 3
                        break
                  end
                  else
                  begin
                        declare curLoadFileLog cursor local static forward_only for
                             select isnull ( Res, N'' ) Res
                             from #LoaderResult
                        open curLoadFileLog
                        fetch next from curLoadFileLog into @CurWorkerLog
                        while @@fetch_status = 0
                        begin
                             set @WorkerLog = isnull ( @WorkerLog, N'' ) + N' ' + isnull ( @CurWorkerLog, N'' )
                            
                             fetch next from curLoadFileLog into @CurWorkerLog
                        end
                        close curLoadFileLog
                        deallocate curLoadFileLog
                  end
                 
                  set @CurAttempt += 1
            end
           
            update dbo.WorkerState
                  set Stat = @res, WorkerLog = isnull ( @WorkerLog, N'' )
                  where dtDate = @dtDate
      end try
      begin catch
            update dbo.WorkerState
                  set Stat = 2, WorkerLog = error_message ()
                  where dtDate = @dtDate
      end catch
end
go

Хранимая процедура dbo.LoadOneFile будет запускаться в асинхронном режиме. Для того, чтобы это было возможно, необходимо использовать службу Service Broker. Для начала разрешим ее использование для базы данных Import:

alter database Import set enable_broker

Код выше накладывает блокировку изменения схемы уровня базы данных, поэтому этот код будет заблокирован, если в базе будут другие активные пользовательские соединения. Когда код отработает, у базы появляется идентификатор брокера:
select service_broker_guid
from sys.databases
where name = N'Import'


Ниже мы напишем процедуру которая будет асинхронно запускать dbo.LoadOneFile и получать xml-документ, содержащий дату, за которую необходимо выполнить загрузку данных. Будем привязывать xml-документ к xml-схеме, которая создается ниже.

use Import
go

if sessionproperty ( N'quoted_identifier' ) = 0
      set quoted_identifier on
go

if sessionproperty ( N'ansi_nulls' ) = 0
      set ansi_nulls on
go

if exists
(
      select top 1 1
      from Import.sys.xml_schema_collections
      where
            name = N'LoadParam' and
            [schema_id] = schema_id ( N'dbo' )
)
begin
      raiserror ( N'Коллекция схем xml с именем "%s" уже существует в схеме "%s" базы данных "%s".', 16, 1,
            N'LoadParam', N'dbo', N'Import' )
      return
end

create xml schema collection dbo.LoadParam
as
      N'
            <xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
                  <xsd:annotation>
                        <xsd:documentation xml:lang="ru">
                             xml-схема предназначена для проверки на валидность xml-документов, используемых для передачи параметров при асинхронных вызовах процедуры загрузки.
                             Автор: Владимир Едынак
                        </xsd:documentation>
                  </xsd:annotation>
                 
                  <xsd:element name="LoadPrm" type="LoadPrmType"/>
                 
                  <xsd:complexType name="LoadPrmType">
                        <xsd:sequence>
                             <xsd:element name="dtDate" type="xsd:date" minOccurs="1" maxOccurs="1"/>
                        </xsd:sequence>
                  </xsd:complexType>
            </xsd:schema>
      '
go


Теперь необходимо создать объекты службы Service Broker. Асинхронный запуск хранимой процедуры будет осуществляться с помощью активации хранимой процедуры, которая происходит в момент появления сообщения в специальной очереди. Очереди будут созданы немного позже, а сейчас надо создать типы сообщений, которые дадут возможность создавать сообщения, которые можно будет класть в очереди. Код ниже создает типы сообщений для очередей, принимающих запросы на асинхронный запуск процедуры загрузки и для очереди, которая будет содержать сообщения о том, что определенный запуск процедуры уже завершился:
if exists
(
      select top 1 1
      from Import.sys.service_message_types
      where name = N'srcLoadData'
)
begin
      raiserror ( N'Тип сообщений с именем "%s" уже существует в базе данных "%s".', 16, 1, N'srcLoadData', N'Import' )
      return
end

create message type srcLoadData
      authorization dbo
      validation = valid_xml with schema collection dbo.LoadParam

go



if exists
(
      select top 1 1
      from Import.sys.service_message_types
      where name = N'trgLoadData'
)
begin
      raiserror ( N'Тип сообщений с именем "%s" уже существует в базе данных "%s".', 16, 1, N'trgLoadData', N'Import' )
      return
end

create message type trgLoadData
      authorization dbo
      validation = well_formed_xml
 go

Теперь необходимо создать так называемый контракт, то есть объект, который позволяет выполнять передачу сообщений тех или иных типов между участниками диалога (в нашем примере между соединением, которое инициирует асинхронный запуск процедуры dbo.LoadOneFile и соединением, которое выполняет запуск данной процедуры и сигнализирует о его окончании).

if exists
(
      select top 1 1
      from Import.sys.service_contracts
      where name = N'LoadData'
)
begin
      raiserror ( N'Контракт с именем "%s" уже существует в базе данных "%s".', 16, 1,
            N'LoadData', N'Import' )
      return
end

create contract LoadData
      authorization dbo
      (
            srcLoadData sent by initiator,
            trgLoadData sent by target
      )
go

Создадим очередь, которая будет принимать запросы на асинхронный запуск.
 if object_id ( N'Import.dbo.srcLoadData', N'SQ' ) is not null
begin
      raiserror ( N'Очередь с именем "%s" уже существует в схеме "%s" базы данных "%s".', 16, 1,
            N'srcLoadData', N'dbo', N'Import' )
      return
end

create queue dbo.srcLoadData
      with
            status = on,
            retention = off
      on [DEFAULT]
go

Перед созданием очереди, в которую будут класться сообщения с отчетами о работе процедуры dbo.LoadOneFile создадим процедуру, которая будет ее читать.


use Import
go

if sessionproperty ( N'ansi_nulls' ) = 0
      set ansi_nulls on
go


if sessionproperty ( N'quoted_identifier' ) = 0
      set quoted_identifier on
go

if object_id ( N'Import.dbo.DistributeLoadMessages', N'P' ) is null
      exec ( N'create proc dbo.DistributeLoadMessages as return 1' )
go


alter proc dbo.DistributeLoadMessages
as
begin
      set nocount, xact_abort on
     
      declare @ConversHandle uniqueidentifier, @ServiceName nvarchar ( 128 ),
            @MessTypeName nvarchar ( 128 ), @MesBody xml ( dbo.LoadParam ), @prm xml, @XmlHandle int,
            @dtDate date, @Respond xml
     
      ;
      receive top ( 1 ) @ConversHandle = [conversation_handle], @ServiceName = [service_name],
            @MessTypeName = message_type_name, @MesBody = message_body
      from Import.dbo.trgLoadData
     
      if @@rowcount = 0
            return
     
      if @MessTypeName not in ( N'srcLoadData' )
            return
     
      if @ServiceName not in ( N'trgLoadData' )
            return
     
      set @prm = @MesBody
     
      exec sp_xml_preparedocument @XmlHandle out, @prm
     
      select @dtDate = dtDate
      from openxml ( @XmlHandle, N'/LoadPrm', 1 )
      with
      (
            dtDate date N'dtDate'
      )
     
      exec sp_xml_removedocument @XmlHandle
     
      set @Respond = N'<Respond>Work is done for date "' + convert ( nvarchar ( 100 ), @dtDate, 105 ) + '"</Respond>'
     
      if @ServiceName in ( N'trgLoadData' )
      begin
            exec Import.dbo.LoadOneFile @dtDate
      end
     
      ;
      send on conversation @ConversHandle
      message type trgLoadData ( @Respond )
end
go

В коде выше используется переменная @ServiceName, обозначающая имя службы. Назначение этого параметра будет объяснено позднее. А теперь создадим очередь, в которую будут складываться сообщения об окончании работы процедуры dbo.LoadOneFile.
if object_id ( N'Import.dbo.trgLoadData', N'SQ' ) is not null
begin
      raiserror ( N'Очередь с именем "%s" уже существует в схеме "%s" базы данных "%s".', 16, 1,
            N'trgLoadData', N'dbo', N'Import' )
      return
end

create queue dbo.trgLoadData
      with
            status = on,
            retention = off,
            activation
            (
                  status = on,
                  procedure_name = Import.dbo.DistributeLoadMessages,
                  max_queue_readers = 30000,
                  exec as self
            )
      on [DEFAULT]
go

Заметим, что в отличие от предыдущей очереди, в данной прописаны параметры активации, то есть имя хранимой процедуры, которая активируется при попадании в очередь сообщения, и которая запускает процедуру загрузки dbo.LoadOneFile.

Теперь можно создать службы.

if exists
(
      select top 1 1
      from Import.sys.services
      where name = N'srcLoadData'
)
begin
      raiserror ( N'Служба с именем "%s" уже существует в базе данных "%s".', 16, 1,
            N'srcLoadData', N'Import' )
      return
end

create service srcLoadData
      authorization dbo
      on queue dbo.srcLoadData ( LoadData )
go


if exists
(
      select top 1 1
      from Import.sys.services
      where name = N'trgLoadData'
)
begin
      raiserror ( N'Служба с именем "%s" уже существует в базе данных "%s".', 16, 1,
            N'trgLoadData', N'Import' )
      return
end

create service trgLoadData
      authorization dbo
      on queue dbo.trgLoadData ( LoadData )
go

Настало время создать хранимую процедуру, которая получает в качестве параметра дату и выполняет асинхронный запуск процедуры, указанной в параметрах активации очереди trgLoadData.

use Import
go

if sessionproperty ( N'ansi_nulls' ) = 0
      set ansi_nulls on
go

if sessionproperty ( N'quoted_identifier' ) = 0
      set quoted_identifier on
go

if object_id ( N'Import.dbo.StartFileLoading', N'P' ) is null
      exec ( N'create proc dbo.StartFileLoading as return 1' )
go

alter proc dbo.StartFileLoading
(
      @dtDate datetime
)
as
begin
      set nocount, xact_abort on
     
      declare @ConversHandle uniqueidentifier, @Request xml ( dbo.LoadParam )
     
      begin try
            set @Request = N'
                  <LoadPrm>
                        <dtDate>' + convert ( nvarchar ( 100 ), cast ( @dtDate as date ), 121 ) + '</dtDate>
                  </LoadPrm>
            '
           
            begin dialog conversation @ConversHandle
                  from service srcLoadData to service N'trgLoadData'
                  on contract LoadData
                  with encryption = off
            ;
            send on conversation @ConversHandle
            message type srcLoadData ( @Request )
      end try
      begin catch
            update Import.dbo.WorkerState
                  set Stat = 2, WorkerLog = N'Произошла ошибка при попытке отправить сообщение брокера (служба srcLoadData): ' +
                        error_message ()
                  where dtDate = @dtDate
      end catch
end
go

Процедура dbo.StartFileLoading после запуска сразу завершает свою работу, а на сервере активируется новое соединение, в котором происходит запуск процедуры dbo.DistributeLoadMessages, которая читает очередь, находит сообщение, извлекает параметры тела сообщения, запускает процедуру dbo.LoadOneFile.

Все, что осталось сделать, это написать хранимую процедуру, которая будет следить за состоянием потоков и очереди запросов на выполнение работы. Данная процедура будет играть роль планировщика.

use Import
go

if sessionproperty ( N'quoted_identifier' ) = 0
      set quoted_identifier on
go

if sessionproperty ( N'ansi_nulls' ) = 0
      set ansi_nulls on
go

if object_id ( N'Import.dbo.LoadDataScheduler', N'P' ) is null
      exec ( N'create proc dbo.LoadDataScheduler as return 1' )
go

alter proc dbo.LoadDataScheduler
as
begin
      set nocount, xact_abort on
     
      declare @DateStart datetime, @DateEnd datetime, @WorkerCount int, @CurWorkerCount int, @dtDate datetime,
            @ConversHandle uniqueidentifier
     
      if object_id ( N'tempdb..#NewWorkers', N'U' ) is not null
            drop table #NewWorkers
      create table #NewWorkers
      (
            dtDate datetime not null,
            primary key clustered ( dtDate asc )
      ) on [PRIMARY]
     
      select @DateStart = DateStart, @DateEnd = DateEnd, @WorkerCount = WorkerCount
      from Import.dbo.LoaderParameters
     
      delete Import.dbo.WorkerState
     
      ;
      with Dates
      as
      (
            select @DateStart dtDate
                  union all
            select dateadd ( [dd], 1, dtDate ) dtDate
            from Dates
            where dtDate < @DateEnd
      )
      insert into Import.dbo.WorkerState
      (
            dtDate,
            CurAttempt,
            FilePath,
            RetryAttempts,
            Stat,
            WorkerLog
      )
            select
                  dtDate, 1, N'C:\Users\Владимир\Desktop\' + convert ( nvarchar ( 100 ), dtDate, 121 ) + '.txt', 20, 0, N''
            from Dates
     
      set @CurWorkerCount = 1
      while @CurWorkerCount <= @WorkerCount
      begin
            select top 1 @dtDate = dtDate
            from Import.dbo.WorkerState with ( nolock )
            where Stat = 0
           
            if @@rowcount <> 0
            begin
                  update Import.dbo.WorkerState
                        set Stat = 1
                        where dtDate = @dtDate
                  exec Import.dbo.StartFileLoading @dtDate
            end
           
            set @CurWorkerCount += 1
      end
     
      while exists
      (
            select top 1 1
            from Import.dbo.WorkerState with ( nolock )
            where Stat = 0
      )
      begin
            select @WorkerCount = WorkerCount
            from Import.dbo.LoaderParameters
           
            select @CurWorkerCount = count (*)
            from Import.dbo.WorkerState with ( nolock )
            where Stat = 1
           
            if @WorkerCount - @CurWorkerCount <= 0
            begin
                  waitfor delay N'00:00:01'
            end
            else
            begin
                  delete #NewWorkers
                 
                  insert into #NewWorkers ( dtDate )
                        select top ( @WorkerCount - @CurWorkerCount ) dtDate
                        from Import.dbo.WorkerState with ( nolock )
                        where Stat = 0
                 
                  update Stat
                        set Stat.Stat = 1
                        from
                             Import.dbo.WorkerState Stat
                                   inner join
                             #NewWorkers tmp on Stat.dtDate = tmp.dtDate
                 
                  declare curNewWrkStart cursor local static forward_only for
                        select dtDate
                        from #NewWorkers
                  open curNewWrkStart
                  fetch next from curNewWrkStart into @dtDate
                  while @@fetch_status = 0
                  begin
                        exec Import.dbo.StartFileLoading @dtDate
                       
                        fetch next from curNewWrkStart into @dtDate
                  end
                  close curNewWrkStart
                  deallocate curNewWrkStart
            end
      end
     
      while exists
      (
            select top 1 1
            from Import.dbo.WorkerState with ( nolock )
            where Stat = 1
      )
      begin
            waitfor delay N'00:00:01'
      end
     
      declare curLoadDialogClear cursor local static forward_only for
            select distinct [conversation_handle]
            from Import.dbo.srcLoadData
      open curLoadDialogClear
      fetch next from curLoadDialogClear into @ConversHandle
      while @@fetch_status = 0
      begin
            end conversation @ConversHandle
            
            fetch next from curLoadDialogClear into @ConversHandle
      end
      close curLoadDialogClear
      deallocate curLoadDialogClear
      
      if exists
      (
            select top 1 1
            from Import.dbo.WorkerState with ( nolock )
            where Stat = 2
      )
      begin
            raiserror ( N'В ходе работы одного из потоков возникла ошибка. Подробности в логе.', 16, 1 )
      end
end
go

Сделаем несколько пояснений к коду. В коде процедуры запускается определенное число потоков, равное столбцу WorkerCount таблицы dbo.LoaderParameters. Затем через равные интервалы времени происходит проверка числа работающих потоков через таблицу статусов. При этой проверке значение WorkerCount всегда вычисляется заново. Это делается для того, чтобы число одновременно работающих потоков можно было динамически уменьшать или увеличивать (вдруг во время загрузки перестанет хватать ресурсов процессора для обслуживания, скажем, 10 одновременно работающих потоков, и тогда, не отменяя загрузку, это число можно будет уменьшить по ходу закачки). При уменьшении числа потоков процедура добавляет новые потоки из числа необработанных дат для достижения нужного числа одновременно работающих программ.

При проведении тестов в моей работе цикл, который бегал по строкам запроса к таблице на другом сервере Oracle, возвращающего 150 млн. записей, складывал данные в текстовый файл около 5 часов, при этом периодически возникали ошибки устаревания снэпшота. После внедрения описанной выше технологии, загрузка при использовании 5 потоков сократилась до 35 минут, а при увеличении числа одновременно работающих потоков до 10 время загрузки снизилось до 25 минут. То есть скорость возросла более чем в 10 раз! При этом тестирование происходило на сервере-приемнике, на котором был единственный двухядерный процессор!

Эту методу можно найти и дальнейшие применения. Например, после того как данные загружены в текстовый файл их надо переместить в таблицы с помощью оператора bulk insert. И эту задачу также можно делать на основе многопоточности, с помощью того же метода, который описан выше. В моей работе на том же сервере с двумя ядрами 20 постоянно одновременно работающих соединений складывали данные из 90 файлов общим объемом в 12 Гб в таблицы за 9 минут.