
https://player.vimeo.com/video/201989439
Хроника очередь-это постоянная рамка обмена сообщениями с низкой задержкой для высокопроизводительных приложений.
Этот проект охватывает Java -версию хроники. Версия C ++ этого проекта также доступна и поддерживает взаимодействие Java/C ++, а также дополнительные языковые привязки, например Python. Если вы заинтересованы в оценке версии C ++, пожалуйста, свяжитесь с [email protected].
На первый взгляд очередь хроники можно рассматривать как просто еще одну реализацию очереди . Тем не менее, у него есть основной выбор дизайна, который следует подчеркнуть. Используя хранение вне Heap , ряд Chronicle обеспечивает среду, в которой приложения не страдают от сбора мусора (GC). При внедрении высокопроизводительных и интенсивных приложений для памяти (вы слышали причудливый термин «bigdata»?) В Java, одной из самых больших проблем является сборник мусора.
Хроника очередь позволяет добавлять сообщения к окончанию очереди («добавленное»), читать из очереди («хвост»), а также поддерживает случайный док.
Вы можете считать, что очередь хроники похожа на низкую задержку, без прочную/устойчивую тему, которая может содержать сообщения разных типов и размеров. Хроника очередь - это распределенная неограниченная стойкая очередь, которая:
Поддерживает асинхронные RMI и публикуйте/подписывает интерфейсы с задержкой микросекунды.
передает сообщения между JVMS под микросекундой
Передает сообщения между JVM на разных машинах через репликацию в менее 10 микросекундах (функция предприятия)
Обеспечивает стабильные, мягкие задержки в реальном времени в миллионы сообщений в секунду для одного потока в одну очередь; с полным заказом каждого события.
При публикации 40-байтовых сообщений высокий процент времени, когда мы достигаем задержек менее 1 микросекунды. Латентность 99 -го процентиля является худшим 1 из 100, а 99,9 -й процентиль - худшая 1 из 1000 задержки.
| Размер партии | 10 миллионов событий в минуту | 60 миллионов мероприятий в минуту | 100 миллионов событий в минуту |
|---|---|---|---|
99%Иль | 0,78 мкс | 0,78 мкс | 1,2 мкс |
99,9%Иль | 1,2 мкс | 1,3 мкс | 1,5 мкс |
| Размер партии | 10 миллионов событий в минуту | 60 миллионов мероприятий в минуту | 100 миллионов событий в минуту |
|---|---|---|---|
99%Иль | 20 мкс | 28 мкс | 176 мкс |
99,9%Иль | 901 мкс | 705 мкс | 5370 мкс |
Примечание | 100 миллионов мероприятий в минуту отправляет мероприятие каждые 660 наносекунд; реплицированы и сохранялись. |
Важный | Эта производительность не достигается с использованием большого кластера машин . Это использует одну ветку для публикации, и один поток для потребления. |
Хроника очередь предназначена для:
Будьте «записи все хранилище», которое может читать с помощью микросекундной задержки в режиме реального времени. Это поддерживает даже самые требовательные высокочастотные торговые системы. Тем не менее, он может использоваться в любом приложении, где рекорд информации является проблемой.
Поддержите надежную репликацию с уведомлением либо о приложении (писатель сообщений), либо на Tainer (читатель сообщения), когда сообщение было успешно воспроизведено.
Хроника очередь предполагает, что дисковое пространство дешево по сравнению с памятью. Хроника в очереди в полной мере использует дисковое пространство, которое у вас есть, и поэтому вы не ограничены основной памятью вашей машины. Если вы используете Spinning HDD, вы можете хранить много TBS на дисковом пространстве для небольших затрат.
Единственное дополнительное программное обеспечение, которое необходимо запустить в хронике, - это операционная система. У него нет брокера; Вместо этого он использует вашу операционную систему для выполнения всей работы. Если ваше приложение умирает, операционная система продолжает работать дольше, поэтому данные не теряются; даже без репликации.
Поскольку очередь в хронике хранит все сохраненные данные в файлах, отображаемых в памяти, это имеет тривиальные накладные расходы на HEAP, даже если у вас более 100 ТБ данных.
Хроника приложила значительные усилия к достижению очень низкой задержки. В других продуктах, которые фокусируются на поддержке веб -приложений, задержки менее 40 миллисекунд в порядке, так как они быстрее, чем вы можете видеть; Например, частота кадров кино составляет 24 Гц или около 40 мс.
Хроника очередь направлена на достижение задержек до 40 микросекунд за 99% до 99,99% случаев. Используя очередь хроники без репликации, мы поддерживаем приложения с задержками ниже 40 микросекунд, сквозных между несколькими службами. Часто 99% задержка очереди хроники полностью зависит от выбора операционной системы и подсистемы жесткого диска.
Репликация для очереди хроники поддерживает хроническое предприятие. Это поддерживает сжатие в реальном времени, которое вычисляет Deltas для отдельных объектов, как они написаны. Это может уменьшить размер сообщений в 10 или, что лучше, без необходимости партии; то есть без введения значительной задержки.
Хроника очередь также поддерживает сжатие LZW, Snappy и Gzip. Эти форматы, однако, добавляют значительную задержку. Они полезны только в том случае, если у вас есть строгие ограничения на пропускную способность сети.
Хроника очередь поддерживает ряд семантики:
Каждое сообщение воспроизводится при перезапуске.
Только новые сообщения воспроизводятся при перезапуске.
Перезагрузите из любой известной точки, используя индекс записи.
Воспроизведите только сообщения, которые вы пропустили. Это поддерживается непосредственно с использованием строителей MethodReader/MethodWriter.
В большинстве систем System.nanoTime() является примерно количество наносекунд с момента последнего перезагрузки системы (хотя различные JVM могут вести себя по -разному). Это то же самое в JVM на одной и той же машине, но отлично различается между машинами. Абсолютная разница, когда дело доходит до машин, бессмысленна. Однако информация может использоваться для обнаружения выбросов; Вы не можете определить, что такое наилучшая задержка, но вы можете определить, насколько далеко вы задержки. Это полезно, если вы сосредотачиваетесь на задержке 99 -го процентиля. У нас есть класс под названием RunningMinimum для получения времени от разных машин, одновременно компенсируя дрейф в nanoTime между машинами. Чем чаще вы проводите измерения, тем точнее этот минимум работает.
Хроника очередь управляет хранением по циклу. Вы можете добавить StoreFileListener , который уведомит вас при добавлении файла, и когда он больше не будет сохранен. Вы можете перемещать, сжать или удалять все сообщения в течение дня, одновременно. Примечание. К сожалению, в Windows, если операция ввода -вывода прервана, она может закрыть базовый файл.
Из -за причин производительности мы удалили проверку прерываний в коде очереди хроники. Из -за этого мы рекомендуем избегать использования очереди хроники с кодом, который генерирует прерывания. Если вы не можете избежать генерации прерываний, мы предлагаем вам создать отдельный экземпляр хронической очереди на поток.
Очередь хроники чаще всего используется для систем, ориентированных на производителей, где вам нужно сохранить много данных в течение нескольких дней или лет. Для статистики см. Использование хроники
Важный | Хроника очередь не поддерживает работу из любой сетевой файловой системы, будь то NFS, AFS, хранилище на базе SAN или что-либо еще. Причина этого заключается в том, что эти файловые системы не предоставляют все необходимые примитивы для хронических файлов, отображаемых с памятью. Если необходимо какое -либо сетевое взаимодействие (например, чтобы сделать данные доступными для нескольких хостов), единственным поддерживаемым способом является репликация очереди хроники (функция предприятия). |
Большинство систем обмена сообщениями ориентированы на потребителей. Управление потоком реализуется, чтобы потребитель когда -либо перегружался; даже на мгновение. Общим примером является сервер, поддерживающий несколько пользователей GUI. Эти пользователи могут быть на разных машинах (ОС и оборудование), разные качества сети (задержка и пропускная способность), делая множество других вещей в разное время. По этой причине для клиента имеет смысл сообщить производителю, когда отступить, задерживая любые данные, пока потребитель не будет готов получить больше данных.
Хроника очередь является решением, ориентированным на производителя, и делает все возможное, чтобы никогда не отталкивать производителя или сказать его, чтобы замедлиться. Это делает его мощным инструментом, обеспечивающим большой буфер между вашей системой, и продюсером вверх по течению, над которым у вас мало или нет.
Издатели рыночных данных не дают вам возможности надолго настаивать на производителя; Если вообще. Некоторые из наших пользователей потребляют данные из CME Opra. Это производит пики в 10 миллионов соревнований в минуту, отправляемые в виде пакетов UDP без повторной попытки. Если вы пропустите или бросите пакет, то он потерян. Вы должны употреблять и записывать эти пакеты так же быстро, как и к вам, с очень небольшим количеством буферизации в сетевом адаптере. В частности, для рыночных данных в реальном времени означает несколько микросекунд ; Это не значит внутридневный (в течение дня).
Хроника очередь быстро и эффективна и использовалась для увеличения скорости, которую передаются данные между потоками. Кроме того, он также ведет запись о каждом передаче сообщений, позволяющее значительно сократить объем ведения журнала, который вам необходимо сделать.
Системы соответствия требуются все большим количеством систем в наши дни. Каждый должен иметь их, но никто не хочет быть замедленным. Используя очередь хроники для буферизации данных между контролируемыми системами и системой соответствия, вам не нужно беспокоиться о влиянии записи соответствия для ваших контролируемых систем. Опять же, очередь хроники может поддерживать миллионы событий на секунду, на сервер и данные доступа, которые были сохранены в течение многих лет.
Хроника очередь поддерживает МПК с низкой задержкой (межпроцессная связь) между JVM на одной и той же машине в порядке 1 микросекунд; а также между машинами с типичной задержкой 10 микросекунд для скромной пропускной способности в несколько сотен тысяч. Хроника очередь поддерживает пропускную способность миллионов событий в секунду со стабильными микросекундными задержками.
См. Статьи об использовании очереди хроники в микросервисах
Очередь хроники может быть использована для построения государственных машин. Вся информация о состоянии этих компонентов может быть воспроизведена извне, без прямого доступа к компонентам или к их состоянию. Это значительно снижает необходимость в дополнительной ведомости. Тем не менее, любая журнала, которую вам нужно, может быть записано в подробности. Это делает обеспечение DEBUG в производстве практичным. Это связано с тем, что стоимость регистрации очень низкая; Менее 10 микросекунд. Журналы могут быть воспроизведены централизованно для консолидации журнала. Очередь хроники используется для хранения 100+ ТБ данных, которые можно воспроизвести с любого момента времени.
Несчастные потоковые компоненты являются высокопрофессиональными, детерминированными и воспроизводимыми. Вы можете воспроизводить ошибки, которые появляются только после миллиона событий, сыгранных в определенном порядке, с ускоренными реалистичными временами. Это делает использование потоковой обработки привлекательным для систем, которые нуждаются в высокой степени качественных результатов.
Выпуски доступны на Maven Central As:
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >Смотрите заметки о выпуске очереди Chronicle и получите последний номер версии. Снимки доступны на https://oss.sonatype.org
Примечание | Классы, которые находятся в одном из пакетов «внутренних», «Impl» и «main» (последние, содержащие различные основные методы), и любые суб-пакеты не являются частью публичного API и могут стать подверженными изменениям в любом случае время по любой причине . Для получения подробной информации см. Соответствующие файлы package-info.java . |
В «Хронике Queue queue v5» теперь только чтения, только в очереди в хронике V4 у нас была концепция ленивой индексации, где приложения не будут писать индексы, но вместо этого индексация могла бы быть выполнена приводка. Мы решили бросить ленивую индексацию в V5; Создание Tailers только для чтения не только упрощает очередь хроники, но и позволяет нам добавлять оптимизации в других местах кода.
Модель блокировки очереди хроники была изменена в V5, в очереди на хронике V4. Замок записи (для предотвращения одновременных записей в очередь) существует в файле .cq4. В V5 это было перемещено в один файл с именем The Table Store (metadata.cq4t). Это упрощает код блокировки внутри, так как должен быть проверен только файл хранилища таблиц.
Вы можете использовать хронику queue queue v5 для чтения сообщений, написанных с помощью queue queue v4, но это не всегда будет работать - если, например, вы создали свою очередь V4 с помощью wireType(WireType.FIELDLESS_BINARY) Прочитайте заголовок очереди. У нас есть некоторые тесты на V5 чтения V4, но они ограничены, и все сценарии не могут быть поддержаны.
Вы не можете использовать хроническую очередь V5, чтобы написать в очереди на хронику queue V4.
Хроника Queue queue v4-это полная переписка очереди хроники, которая решает следующие проблемы, которые существовали в V3.
Без сообщений о самоописании пользователи должны были создать свои собственные функции для сброса сообщений и долгосрочного хранения данных. С V4 вам не нужно это делать, но вы можете, если хотите.
Ванильная хроническая очередь создаст файл на поток. Это нормально, если количество потоков контролируется, однако, многие приложения практически не имеют контроля над тем, сколько потоков используется, и это вызвало проблемы с удобством использования.
Конфигурация для индексированной и ванильной хроники была полностью в коде, поэтому читатель должен был иметь такую же конфигурацию, что и у авторов, и не всегда ясно, что это было.
Производитель не мог знать, сколько данных было воспроизведено на вторую машину. Единственным обходным путем был воспроизведение данных обратно производителям.
Вам нужно было указать размер данных, чтобы зарезервировать, прежде чем вы начали писать свое сообщение.
Вам нужно было сделать свою собственную блокировку для приложения при использовании индексированной хроники.
В очереди в хронике V3 все было с точки зрения байтов, а не проволоки. Есть два способа использования байта в очереди на хронике V4. Вы можете использовать методы writeBytes и readBytes , или вы можете получить bytes() от провода. Например:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ())); try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}Chronicle Queue Enterprise Edition - это коммерчески поддерживаемая версия нашей успешной очереди с открытым исходным кодом. Документация с открытым исходным кодом расширяется следующими документами для описания дополнительных функций, которые доступны, когда вы имеете лицензию на Enterprise Edition. Это:
Шифрование очередей и сообщений сообщений. Для получения дополнительной информации см. Документацию по шифрованию.
Репликация TCP/IP (и необязательно UDP) между хостами, чтобы обеспечить резервное копирование всех данных вашей очереди в реальном времени. Для получения дополнительной информации см. Документацию по репликации, протокол репликации очереди рассматривается в протоколе репликации.
Поддержка часового пояс для ежедневного расписания рулона очередей. Для получения дополнительной информации см. Поддержку часового пояса.
Поддержка Async Mode, чтобы обеспечить улучшенную производительность при высокой пропускной способности на более медленных файловых системах. Для получения дополнительной информации см. Async Mode, а также производительность.
Pro-Toucher для улучшенных выбросов, см. Pro-Toucher и его конфигурация
Кроме того, вы будете полностью поддержаны нашими техническими экспертами.
Для получения дополнительной информации о хронике Queue Enterprise Edition, пожалуйста, свяжитесь с [email protected].
Очередь хроники определяется SingleChronicleQueue.class , который предназначен для поддержки:
rowling files на ежедневной, еженедельной или почасовой основе,
одновременные писатели на той же машине,
одновременные считыватели на одной машине или на нескольких машинах через репликацию TCP (с хроническим очередью предприятия),
одновременные читатели и писатели между Docker или другими рабочими нагрузками в контейнер
Сериализация нулевой копии и десериализация,
Миллионы записей/чтения в секунду на товарном оборудовании.
Приблизительно 5 миллионов сообщений/секунд для 96-байтовых сообщений на процессоре I7-4790. Структура каталога очередей заключается в следующем:
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling. Формат состоит из байтов с размером, которые отформатированы с использованием BinaryWire или TextWire . Хроника очередь предназначена для изгнания из кода. Вы можете легко добавить интерфейс, который соответствует вашим потребностям.
Примечание | Из-за довольно низкой работы операции чтения/записи в очереди хроники могут сделать неконтролируемые исключения. Чтобы предотвратить смерть потока, может быть практичным ловить RuntimeExceptions и регистрировать/анализировать их в зависимости от необходимости. |
Примечание | Для демонстраций того, как можно использовать очередь хроники см. В хронике демо -версии и документации на Java см. Хроника |
В следующих разделах сначала мы представляем некоторую терминологию и быструю ссылку на использование очереди хроники. Затем мы предоставляем более подробное руководство.
Хроника очередь - это постоянный журнал сообщений, который поддерживает параллельных авторов и читателей даже в нескольких JVM на одной машине. Каждый читатель видит каждое сообщение, и читатель может присоединиться в любое время и при этом увидеть каждое сообщение.
Примечание | Мы намеренно избегаем термина потребителя и вместо этого используем читателя , поскольку сообщения не используются/не уничтожаются с помощью чтения. |
В очереди на хронике есть следующие основные понятия:
Выдержка
Выдержка является основным контейнером данных в очереди хроники. Другими словами, каждая очередь хроники состоит из выдержки. Написание сообщения в очередь хроники означает начать новую выдержку, вписать в него сообщение и завершить выдержку в конце.
Приложение
Приложение является источником сообщений; Что -то вроде итератора в хронической среде. Вы добавляете данные, добавляя текущую очередь хроники. Он может выполнять последовательные записи, добавив только к окончанию очереди. Там нет способа вставить или удалить выдержки.
Приверженец
Художник - это отрывок для чтения, оптимизированный для последовательных чтений. Он может выполнять последовательные и случайные чтения, как вперед, так и назад. Тушники читают следующее доступное сообщение каждый раз, когда они вызываются. Последствия гарантируются в очереди на хронике:
Для каждого приложения сообщения написаны в том порядке, который приложение их написал. Сообщения от разных аппендеров переоценены,
Для каждого пособия он увидит каждое сообщение для темы в одном и том же порядке, что и все остальные Tainer,
При воспроизведении каждая копия имеет копию каждого сообщения.
Хроника очередь без брокеров. Если вам нужна архитектура с брокером, пожалуйста, свяжитесь с [email protected].
Файлы с помощью файлов и очередей
Хроника очередь предназначена для переворачивания своих файлов в зависимости от цикла рулона, выбранного при создании очереди (см. Rollcycles). Другими словами, для каждого цикла рулона создается файл очереди, который имеет расширение cq4 . Когда цикл броска достигает точки, который он должен свернуть, Appender будет атомно писать марку EOF в конце текущего файла, чтобы указать, что ни один другой приложение не должно записать в этот файл, и ни один из них не должен читать дальше, и вместо этого каждый должен использовать новый файл.
Если процесс был выключен и перезагрузился позже, когда цикл броска должен использовать новый файл, приложение попытается найти старые файлы и написать в них отметку EOF , чтобы помочь приспособлять их чтение.
Темы
Каждая тема представляет собой каталог файлов очередей. Если у вас есть тема под названием mytopic , макет может выглядеть так:
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4Чтобы скопировать все данные за один день (или цикл), вы можете скопировать файл на этот день на свой компьютер разработки для тестирования на воспроизведение.
Ограничения по темам и сообщениям
Темы ограничены строками, которые можно использовать в качестве имен каталогов. В рамках темы вы можете иметь подзащики, которые могут быть любым типом данных, который может быть сериализован. Сообщения могут быть любыми сериализуемыми данными.
Поддержка очереди хроники:
Serializable объекты, хотя этого следует избегать, так как это не эффективно
Externalizable Objects предпочтительнее, если вы хотите использовать стандартные Java API.
byte[] и String
Marshallable ; Самозаписное сообщение, которое может быть написано как YAML, бинарный YAML или JSON.
BytesMarshallable , который является бинарным или текстовым кодированием низкого уровня.
В этом разделе представлена краткая ссылка на использование очереди хроники, чтобы кратко показать, как создать, записать/читать в/из очереди.
Хроника строительство очередей
Создание экземпляра очереди хроники отличается от простого вызова конструктора. Чтобы создать экземпляр, вы должны использовать ChronicleQueueBuilder .
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build (); В этом примере мы создали IndexedChronicle , который создает два RandomAccessFiles ; один для индексов, и один для данных, имеющих относительно имена:
${java.io.tmpdir}/getting-started/{today}.cq4Написание в очередь
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );Чтение из очереди
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ()); Кроме того, метод ChronicleQueue.dump() может использоваться для сброса необработанного содержимого в качестве строки.
queue . dump ();Уборка
В очереди в хронике хранится свои данные вне HEAP, и рекомендуется вызов close() как только вы закончите работу с очередью хроники, для бесплатных ресурсов.
Примечание | Никакие данные не будут потеряны, если вы это сделаете. Это только для очистки ресурсов, которые использовались. |
queue . close ();Сделать все это вместе
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
} Вы можете настроить очередь хроники, используя ее параметры конфигурации или свойства системы. Кроме того, существуют разные способы написания/чтения в/из очереди, такие как использование прокси и использование MethodReader и MethodWriter .
Хроника очередь (CQ) может быть настроена несколькими методами на классе SingleChronicleQueueBuilder . Несколько параметров, которые были наиболее запрошены нашими клиентами, объясняются ниже.
RollCycle
Параметр RollCycle настраивает скорость, с которой CQ будет выполнять базовые файлы очереди. Например, использование следующего фрагмента кода приведет к тому, что файлы очереди будут развернуты (то есть созданный новый файл) каждый час:
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build () После того, как он не был установлен в очередь, его нельзя изменить позднее. Любые дальнейшие случаи SingleChronicleQueue , настроенные для использования того же пути, должны быть настроены для использования того же цикла рулона, а если это не так, то цикл рулона будет обновлен в соответствии с постоянным циклом рулона. В этом случае будет напечатано сообщение журнала предупреждения, чтобы уведомить пользователя библиотеки о ситуации:
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}Вывод консоли:
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.Максимальное количество сообщений, которые можно хранить в файле очереди, зависит от цикла броска. Смотрите FAQ для получения дополнительной информации об этом.
В очереди на хронику время пролонгации основано на UTC. Функция «Рокновое предприятие времени» простирает способность хронической очереди указывать время и периодичность переноса очередей, а не UTC. Для получения дополнительной информации см. Вернув в очередь часового пояса.
Класс FileUtil Chrineutil Chronicle предоставляет полезные методы для управления файлами очередей. См. Управление рулонными файлами напрямую.
Проипросытый
Можно настроить, как хроническая очередь будет хранить данные, явно установив WireType :
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )Например:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )Несмотря на то, что при создании строителя можно явно предоставить Wiretype, он не поощряется, поскольку еще не все типы проводов поддерживаются в очереди хроникой. В частности, следующие типы проводов не поддерживаются:
Текст (и, по сути, на основе текста, включая JSON и CSV)
СЫРОЙ
Read_any
блокируется
Когда очередь прочитана/написана, часть файла, который в настоящее время читается/написан, отображается с сегментом памяти. Этот параметр управляет размером блока отображения памяти. Вы можете изменить этот параметр, используя метод SingleChronicleQueueBuilder.blockSize(long blockSize) если это необходимо.
Примечание | Вы должны избегать смены blockSize без необходимости. |
Если вы отправляете большие сообщения, вам следует установить большой blockSize , т.е., то blockSize должны быть как минимум в четыре раза больше размера сообщения.
Предупреждение | Если вы используете небольшие blockSize для больших сообщений, вы получаете IllegalStateException , а запись прерывается. |
Мы рекомендуем вам использовать один и тот же blockSize для каждого экземпляра очереди при репликации очередей, blockSize не записываются в метаданные очереди, поэтому в идеале следует установить на то же значение при создании экземпляров хронической очереди (это рекомендуется, но если вы хотите Чтобы запустить с другим blocksize вы можете).
Кончик | Используйте один и тот же blockSize для каждого экземпляра реплицированных очередей. |
Индексное расстояние
Этот параметр показывает пространство между выдержками, которые явно проиндексированы. Более высокое число означает более высокую последовательную производительность записи, но медленное чтение случайного доступа. Последовательная производительность чтения не влияет на это свойство. Например, можно вернуть следующее расстояние между индексами по умолчанию:
16 (подробно)
64 (ежедневно)
Вы можете изменить этот параметр, используя метод SingleChronicleQueueBuilder.indexSpacing(int indexSpacing) .
indexcount
Размер каждого массива индексов, а также общее количество индексных массивов на файл очереди.
Примечание | IndexCount 2 - максимальное количество индексированных записей очередей. |
Примечание | См. Раздел «Индексация выдержки» в очереди на хронике этого руководства пользователя для получения дополнительной информации и примеров использования индексов. |
readbuffermode, writebuffermode
Эти параметры определяют Buffermode для чтения или записей, которые имеют следующие параметры:
None - по умолчанию (и единственный доступен для пользователей с открытым исходным кодом), без буферизации;
Copy - используется в сочетании с шифрованием;
Asynchronous - используйте асинхронный буфер при чтении и/или письме, предоставленном в режиме Chronicle Async.
Buffercapacity
Емкость кольца в байтах при использовании bufferMode: Asynchronous
В очереди на хронике мы называем акт написания ваших данных в очередь хроники, как хранение отрывка. Эти данные могут быть составлены из любого типа данных, включая текст, числа или сериализованные капли. В конечном счете, все ваши данные, независимо от того, что они есть, хранятся как серия байтов.
Незадолго до хранения выдержки, хроника оставляет за собой 4-байтовый заголовок. Хроника очередь записывает длину ваших данных в этот заголовок. Таким образом, когда очередь хроники приходит к чтению вашей выдержки, он знает, как долго каждый капля данных. Мы называем этот 4-байтовый заголовок вместе с вашим отрывом, как документ. Строго говоря, очередь хроники может быть использована для чтения и написания документов.
Примечание | В рамках этого 4-байтового заголовка мы также оставляем несколько бит для ряда внутренних операций, таких как блокировка, чтобы сделать хроническую очередь защищены как по процессорам, так и на потоках. Важно отметить, что из -за этого вы не можете строго преобразовать 4 байта в целое число, чтобы найти длину вашей капли данных. |
Как указывалось ранее, Cronicle Queue использует приложение для записи в очередь и придерживаемого для чтения из очереди. В отличие от других решений из очередей за Java, сообщения не теряются, когда они читаются с Tainer. Это более подробно рассматривается в разделе ниже на «Чтение из очереди с использованием Tainer». Чтобы записать данные в очередь хроники, вы должны сначала создать приложение:
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}Хроника очередь использует следующий низкоуровневый интерфейс для записи данных:
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
} Близко на попытке с ресурсами-это точка, когда длина данных записывается в заголовок. Вы также можете использовать DocumentContext , чтобы узнать, что ваши данные только что назначены (см. Ниже). Позже вы можете использовать этот индекс для перемещения/поиска этой отрывки. Каждая выдержка в очереди в хронике имеет уникальный индекс.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
} Приведенные ниже методы высокого уровня, такие как writeText() являются удобными методами призывы appender.writingDocument() , но оба подхода по существу делают одно и то же. Фактический код writeText(CharSequence text) выглядит следующим образом:
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}Таким образом, у вас есть выбор различных интерфейсов высокого уровня, вплоть до API низкого уровня, до необработанной памяти.
Это API самого высокого уровня, который скрывает тот факт, что вы пишете вообще для обмена сообщениями. Преимущество заключается в том, что вы можете поменять вызовы на интерфейс с реальным компонентом или интерфейс на другой протокол.
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));Вы можете написать «Сообщение о самоописании». Такие сообщения могут поддерживать изменения схемы. Их также легче понять при отладке или диагностике проблем.
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));Вы можете написать «необработанные данные», что является самоописанием. Типы всегда будут правильными; Положение является единственным признаком значения этих значений.
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));Вы можете написать «необработанные данные», что не является самоописанием. Ваш читатель должен знать, что означают эти данные, и какие типы, которые использовались.
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));Ниже показан самый низкий уровень записи данных. Вы получаете адрес для необработанной памяти, и вы можете написать все, что хотите.
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});Вы можете распечатать содержимое очереди. Вы можете увидеть первые два, и последние два сообщения хранят те же данные.
// dump the content of the queue
System . out . println ( queue . dump ());Отпечатки:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World Чтение очереди следует тому же шаблону, что и написание, за исключением того, что есть вероятность, что, когда вы пытаетесь прочитать его.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
} Вы можете превратить каждое сообщение в вызов метода на основе содержимого сообщения, и иметь очередь хроники автоматически детериализуйте аргументы метода. Calling reader.readOne() автоматически пропустит (отфильтровывать) любые сообщения, которые не соответствуют вашему считыванию метода.
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());Вы можете расшифровать сообщение самостоятельно.
Примечание | Имена, тип и порядок полей не должны совпадать. |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));Вы можете прочитать значения данных самоописания. Это будет проверять типы правильные и конвертируются по мере необходимости.
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));Вы можете читать необработанные данные как примитивы и струны.
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));Или вы можете получить базовый адрес памяти и получить доступ к собственной памяти.
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
})); Примечание | Каждый Художник видит каждое сообщение. |
Абстракция может быть добавлена в сообщения фильтрации или назначить сообщения только одному процессору сообщения. Однако в целом вам нужен только один основной пакет для темы, с возможностью некоторых вспомогательных подводников для мониторинга и т. Д.
Поскольку очередь Chronicle не разделяет его темы, вы получаете полное заказ всех сообщений в этой теме. По всем темам нет никакой гарантии заказа; Если вы хотите детерминированно воспроизвести из системы, которая потребляет по нескольким темам, мы предлагаем воспроизведение с вывода этой системы.
Хронические поток очереди могут создавать обработчики файлов, обработчики файлов очищаются всякий раз, когда соответствующий метод close() или всякий раз, когда JVM запускает сборку мусора. Если вы пишете свой код, у вас нет пауза GC, и вы явно хотите очистить обработчики файлов, вы можете позвонить следующему:
(( StoreTailer ) tailer ). releaseResources ()ExcerptTailer.toEnd() В некоторых приложениях может потребоваться начать читать с конца очереди (например, в сценарии перезапуска). Для этого варианта использования ExcerptTailer предоставляет метод toEnd() . Когда направление Tainer будет FORWARD (по умолчанию или в соответствии с методом ExcerptTailer.direction ), тогда вызовет toEnd() поместит Tailer сразу после последней существующей записи в очереди. В этом случае Tainer теперь готов к чтению любых новых записей, добавленных в очередь. До тех пор, пока какие -либо новые сообщения не будут добавлены в очередь, для чтения не будет доступного нового DocumentContext :
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();Если необходимо прочитать назад через очередь с конца, то Tainer можно настроить для чтения назад:
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd (); При чтении задом наперед метод toEnd() переместит Tainer к последней записи в очереди. Если очередь не пустая, то для чтения будет доступен DocumentContext :
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();АКА по имени Thiners.
Может быть полезно иметь пособия, который продолжается от того места, где он находился при перезапуске приложения.
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}Tailer "a"
Tailer "a" Далее читает сообщение 3
Tailer "B"
Tailer "B" Далее читает сообщение 1
This is from the RestartableTailerTest where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart() , toEnd() , moveToIndex() or reads a message.
Примечание | The direction() is not preserved across restarts, only the next index to be read. |
Примечание | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4 :
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������� This can often be a bit difficult to read, so it is better to dump the cq4 files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain or net.openhft.chronicle.queue.ChronicleReaderMain . DumpMain performs a simple dump to the terminal while ChronicleReaderMain handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4 file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar , from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4 Кончик | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4 This will dump the 19700101-02.cq4 file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining Примечание | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh located in the Chonicle-Queue/bin -folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain . The script can be run from the Chronicle-Queue root folder like this:
$ ./bin/dump_queue.sh <file path> ChronicleReaderMain The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain (in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain :
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build > Once the Uber jar is present, you can run ChronicleReaderMain from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh which again is located in the Chonicle-Queue/bin -folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain . The script can be run from the Chronicle-Queue root folder like this:
$ ./bin/queue_reader.sh <options> ChronicleWriter If using MethodReader and MethodWriter then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain or the shell script queue_writer.sh eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method nameIf you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}public class DTO extends SelfDescribingMarshallable { private int age; Приватное название строки; }
Then you can call ChronicleWriterMain -d queue doit x.yaml with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}или
!x.y.z.DTO {
age : 42,
name : Percy
} If DTO makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface , where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender(); the acquireAppender() uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>); Примечание | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerptsMove to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
} You can add a StoreFileListener to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute() method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown() method can be called to close the supplied queue and release any other resources. Invocation of the execute() method after shutdown() has been called will cause an IllegalStateException to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle (defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
Предупреждение | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs (defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs .
SingleChronicleQueueExcerpts.dontWrite (defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle is set to true and pretoucherPrerollTimeMs to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" ); The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute() method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
} The method close() , closes the Pretoucher and releases its resources.
pretouch . close (); Примечание | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
Примечание | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument() is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument() call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close() is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true); Примечание | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
Результаты:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark ), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 ------------------------------------------------------ ------------------------------------------------------ --------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 ------------------------------------------------------ ------------------------------------------------------ ---------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 ------------------------------------------------------ ------------------------------------------------------ --------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 ------------------------------------------------------ ------------------------------------------------------ ---------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 ------------------------------------------------------ ------------------------------------------------------ --------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 ------------------------------------------------------ ------------------------------------------------------ ---------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------- Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 ------------------------------------------------------ ------------------------------------------------------ --------------- -------------------------------- SUMMARY (Concurrent2) ----------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 ------------------------------------------------------ ------------------------------------------------------ ---------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop , you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal() if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint into the code because thread dumps are only reported at safe-points.
Результаты:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | Throughput |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.