
https://player.vimeo.com/video/201989439
크로니클 대기열은 고성능 응용 프로그램을위한 지속적인 저도 메시징 프레임 워크입니다.
이 프로젝트는 Java 버전의 Chronicle Deue를 다룹니다. 이 프로젝트의 C ++ 버전도 사용할 수 있으며 Java/C ++ 상호 운용성과 추가 언어 바인딩 (예 : Python)을 지원합니다. C ++ 버전 평가에 관심이 있으시면 [email protected]에 문의하십시오.
언뜻보기에는 크로니클 대기열이 단순히 다른 대기열 구현 으로 볼 수 있습니다. 그러나 강조해야 할 주요 설계 선택이 있습니다. Chronicle Queue는 해외 스토리지를 사용하여 응용 프로그램이 GC (Garbage Collection)로 어려움을 겪지 않는 환경을 제공합니다. Java에서 고성능 및 메모리 집약적 인 응용 프로그램 ( "BigData"라는 용어를 들었을 때)을 구현할 때 가장 큰 문제 중 하나는 쓰레기 수집입니다.
크로니클 대기열을 사용하면 큐 끝에 메시지를 추가 할 수 있으며 ( "부록"), 대기열 ( "꼬리")에서 읽고 무작위 액세스 찾기를 지원합니다.
크로니클 대기열은 다른 유형과 크기의 메시지를 포함 할 수있는 낮은 대기 시간 중개인 내구성/지속 주제와 유사하다고 생각할 수 있습니다. 크로니클 대기열은 분산되지 않은 무한 지속 큐입니다.
비동기 RMI를 지원하고 마이크로 초 대기 시간이있는 인터페이스를 게시/구독합니다.
마이크로 초 아래에서 JVMS 사이에 메시지를 전달합니다
10 마이크로 초 미만 (Enterprise Feature)에서 복제를 통해 다른 컴퓨터에서 JVMS 간 메시지를 전달합니다.
단일 스레드가 한 줄로 하나의 큐에 대해 초당 수백만 개의 메시지에 안정적인 소프트 실시간 대기 시간을 제공합니다. 모든 이벤트의 전체 주문으로.
40 바이트 메시지를 게시 할 때 1 마이크로 초 미만의 대기 시간을 달성 할 수 있습니다. 99 번째 백분위 수 대기 시간은 100에서 최악의 1이고 99.9 번째 백분위 수는 1000 대기 시간 중 최악입니다.
| 배치 크기 | 분당 천만 이벤트 | 분당 6 천만 이벤트 | 분당 1 억 이벤트 |
|---|---|---|---|
99%ILE | 0.78 µs | 0.78 µs | 1.2 µs |
99.9%ILE | 1.2 µs | 1.3 µs | 1.5 µs |
| 배치 크기 | 분당 천만 이벤트 | 분당 6 천만 이벤트 | 분당 1 억 이벤트 |
|---|---|---|---|
99%ILE | 20 µs | 28 µs | 176 µs |
99.9%ILE | 901 µs | 705 µs | 5,370 µs |
메모 | 분당 1 억 개의 이벤트는 660 나노초마다 이벤트를 보내고 있습니다. 복제 및 지속. |
중요한 | 이 성능은 대형 기계 클러스터를 사용하여 달성되지 않습니다. 이것은 하나의 스레드를 사용하여 게시하고 하나의 스레드를 소비합니다. |
크로니클 대기열은 다음과 같이 설계되었습니다.
마이크로 초 실시간 대기 시간으로 읽을 수있는 "레코드 모든 매장"이어야합니다. 이것은 가장 까다로운 고주파 거래 시스템조차 지원합니다. 그러나 정보 기록이 우려되는 모든 응용 프로그램에서 사용할 수 있습니다.
메시지가 성공적으로 복제 된 경우 Appender (Writer of Messag
크로니클 대기열은 디스크 공간이 메모리에 비해 저렴하다고 가정합니다. 크로니클 대기열은 당신이 가지고있는 디스크 공간을 최대한 활용하므로 기계의 주요 메모리에 국한되지 않습니다. Spinning HDD를 사용하는 경우 비용이 거의 들지 않도록 많은 TB의 디스크 공간을 저장할 수 있습니다.
크로니클 큐가 실행 해야하는 유일한 추가 소프트웨어는 운영 체제입니다. 중개인이 없습니다. 대신 운영 체제를 사용하여 모든 작업을 수행합니다. 응용 프로그램이 죽으면 운영 체제는 몇 초 동안 계속 실행되므로 데이터가 손실되지 않습니다. 복제 없이도.
크로니클 큐는 메모리 매핑 파일에 저장된 모든 데이터를 저장하므로 100TB가 넘는 데이터가 있더라도 사소한 온전한 오버 헤드가 있습니다.
크로니클은 매우 낮은 대기 시간을 달성하기 위해 상당한 노력을 기울였습니다. 웹 애플리케이션의 지원에 중점을 둔 다른 제품에서는 40 밀리 초 미만의 대기 시간이 볼 수있는 것보다 빠르기 때문에 괜찮습니다. 예를 들어, 영화의 프레임 속도는 24Hz 또는 약 40ms입니다.
크로니클 대기열은 40 마이크로 초 미만의 대기 시간을 99% ~ 99.99%로 달성하는 것을 목표로합니다. 크로니클 대기열을 복제하지 않고 사용하면 여러 서비스에서 40 마이크로 초 미만의 대기 시간이있는 응용 프로그램을 지원합니다. 크로니클 대기열의 99% 대기 시간은 종종 운영 체제 및 하드 디스크 하위 시스템의 선택에 전적으로 의존합니다.
크로니클 큐의 복제는 크로니클 와이어 엔터프라이즈를 지원합니다. 이는 작성된 개별 객체의 델타를 계산하는 실시간 압축을 지원합니다. 이렇게하면 배치없이 메시지 크기를 10 배 이상 줄일 수 있습니다. 즉, 상당한 대기 시간을 도입하지 않고.
크로니클 대기열은 LZW, Snappy 및 GZIP 압축도 지원합니다. 그러나이 형식은 상당한 대기 시간을 추가합니다. 네트워크 대역폭에 엄격한 제한이있는 경우에만 유용합니다.
크로니클 대기열은 여러 의미를 지원합니다.
모든 메시지는 다시 시작시 재생됩니다.
다시 시작하면 새 메시지 만 재생됩니다.
항목의 색인을 사용하여 알려진 지점에서 다시 시작하십시오.
놓친 메시지 만 재생합니다. 이것은 MethodReader/MethodWriter Builders를 사용하여 직접 지원됩니다.
대부분의 시스템 시스템에서 System.nanoTime() 시스템이 마지막으로 재부팅 되었기 때문에 대략 나노초 수입니다 (다른 JVMS가 다르게 행동 할 수 있지만). 이것은 같은 기계의 JVM에 걸쳐 동일하지만 기계마다 크게 다릅니다. 기계와 관련하여 절대적인 차이는 의미가 없습니다. 그러나 정보는 이상치를 감지하는 데 사용될 수 있습니다. 최고의 대기 시간이 무엇인지 결정할 수는 없지만 최고의 대기 시간이 얼마나 멀리 떨어져 있는지 결정할 수 있습니다. 이것은 99 번째 백분위 수준에 초점을 맞추는 경우 유용합니다. 우리는 다른 기계에서 타이밍을 얻기 위해 RunningMinimum 이라는 클래스를 가지고 있으며 기계 사이의 nanoTime 에서 드리프트를 보상합니다. 더 자주 측정할수록이 실행 중 최소가 더 정확합니다.
크로니클 큐는주기별로 저장을 관리합니다. 파일이 추가 될 때 및 더 이상 유지되지 않을 때 알리는 StoreFileListener 를 추가 할 수 있습니다. 하루 동안 모든 메시지를 한 번에 이동, 압축 또는 삭제할 수 있습니다. 참고 : 불행히도 Windows에서 IO 작업이 중단되면 기본 FilEchannel을 닫을 수 있습니다.
성능의 이유로 인해 크로니클 큐 코드의 인터럽트 확인을 제거했습니다. 이 때문에 인터럽트를 생성하는 코드와 함께 크로니클 큐를 사용하지 않는 것이 좋습니다. 인터럽트 생성을 피할 수 없다면 스레드 당 크로니클 대기열 인스턴스를 만드는 것이 좋습니다.
크로니클 대기열은 며칠 또는 몇 년 동안 많은 데이터를 유지 해야하는 생산자 중심 시스템에 가장 자주 사용됩니다. 통계는 크로니클 큐의 사용을 참조하십시오
중요한 | 크로니클 큐는 NFS, AFS, SAN 기반 스토리지 또는 기타 네트워크 파일 시스템에서 작동하는 것을 지원하지 않습니다 . 그 이유는 파일 시스템이 메모리 매핑 된 파일 크로니클 대기열에 필요한 모든 프리미티브를 제공하지는 않습니다. 네트워킹이 필요한 경우 (예 : 여러 호스트에 데이터에 액세스 할 수 있도록) 유일하게 지원되는 방법은 Chronicle Queue Replication (Enterprise Feature)입니다. |
대부분의 메시징 시스템은 소비자 중심입니다. 소비자가 과부하가 발생하지 않도록 흐름 제어가 구현됩니다. 순간적으로. 일반적인 예는 여러 GUI 사용자를 지원하는 서버입니다. 이러한 사용자는 다른 시스템 (OS 및 하드웨어), 네트워크 및 대역폭의 다른 품질 (대기 시간 및 대역폭)에있을 수 있으며 다른 시간에 다양한 작업을 수행 할 수 있습니다. 이러한 이유로 클라이언트 소비자가 생산자에게 언제 백업 해야하는지 알리는 것이 합리적이며 소비자가 더 많은 데이터를 가져갈 준비가 될 때까지 데이터를 지연시킵니다.
크로니클 대기열은 생산자 중심 솔루션이며 생산자를 다시 밀거나 속도를 늦추지 않도록 가능한 모든 것을 수행합니다. 이를 통해 시스템 사이에 큰 버퍼를 제공하는 강력한 도구와 제어가 거의 없거나 없어지는 업스트림 생산 업체를 제공합니다.
Market Data Publisher는 프로듀서를 Long에 다시 밀어 넣을 수있는 옵션을 제공하지 않습니다. 전혀. 몇몇 사용자는 CME OPRA의 데이터를 소비합니다. 이로 인해 분당 천만 이벤트의 피크가 생성되며, 재 시도없이 UDP 패킷으로 전송됩니다. 패킷을 놓치거나 떨어 뜨리면 손실됩니다. 네트워크 어댑터에 버퍼링이 거의 없으면서 패킷을 빠르게 소비하고 기록해야합니다. 특히 시장 데이터의 경우, 실시간은 몇 마이크로 초의 수단입니다. 하루 종일 (낮 동안)를 의미하지는 않습니다.
크로니클 대기열은 빠르고 효율적이며 데이터가 스레드간에 전달되는 속도를 높이는 데 사용되었습니다. 또한 모든 메시지의 기록을 유지하여 필요한 로깅의 양을 크게 줄일 수 있습니다.
규정 준수 시스템은 요즘 점점 더 많은 시스템에서 필요합니다. 모든 사람은 그들을 가지고 있어야하지만 아무도 그들에 의해 속도를 늦추기를 원하지 않습니다. 모니터링 된 시스템과 컴플라이언스 시스템 간의 데이터를 완충하기 위해 크로니클 큐를 사용하면 모니터링 된 시스템에 대한 준수 기록의 영향에 대해 걱정할 필요가 없습니다. 다시 말하지만, Chronicle Deue는 수년간 보유 된 수백만 건의 이벤트, 서버 당 수백만의 이벤트를 지원할 수 있습니다.
크로니클 큐는 1 마이크로 초의 크기의 순서로 동일한 기계의 JVMS 간의 낮은 대기 시간 IPC (Inter Procection Communication)를 지원합니다. 수십만 개의 겸손한 처리량을 위해 전형적인 대기 시간이 10 마이크로 초인 기계 사이. 크로니클 대기열은 안정적인 마이크로 초 대기 시간과 함께 초당 수백만 개의 이벤트 처리량을 지원합니다.
마이크로 서비스에서 크로니클 대기열 사용에 관한 기사를 참조하십시오.
연대기 대기열은 상태 기계를 구축하는 데 사용될 수 있습니다. 해당 구성 요소의 상태에 대한 모든 정보는 구성 요소 또는 해당 상태에 직접 액세스하지 않고도 외부에서 재현 할 수 있습니다. 이는 추가 로깅의 필요성을 크게 줄입니다. 그러나 필요한 로깅에 대해 자세히 기록 할 수 있습니다. 이로 인해 프로덕션에서 DEBUG 로깅을 활성화 할 수 있습니다. 벌목 비용이 매우 낮기 때문입니다. 10 마이크로 초 미만. 로그 통합을 위해 로그를 중앙에서 복제 할 수 있습니다. 크로니클 대기열은 100 TB의 데이터를 저장하는 데 사용되며, 어느 시점에서든 재생할 수 있습니다.
비 배열 스트리밍 구성 요소는 성능이 높고 결정적이며 재현 가능합니다. 현실적인 타이밍이 가속화되어 특정 순서로 백만 이벤트가 발생한 후에 만 표시되는 버그를 재현 할 수 있습니다. 이로 인해 스트림 처리를 사용하는 것은 높은 수준의 품질 결과가 필요한 시스템에 매력적입니다.
릴리스는 Maven Central에서 다음과 같이 사용할 수 있습니다.
< dependency >
< groupId >net.openhft</ groupId >
< artifactId >chronicle-queue</ artifactId >
< version > <!-- replace with the latest version, see below --> </ version >
</ dependency >크로니클 큐 릴리스 노트를 참조하고 최신 버전 번호를 얻으십시오. 스냅 샷은 https://oss.sonatype.org에서 제공됩니다
메모 | 패키지 '내부', '임플란부'및 '메인'(다양한 런닝 가능한 주요 방법을 포함하는 후자)에 상주하는 클래스 및 모든 하위 패키지는 공개 API의 일부가 아니며 어떤 것도 변경 될 수 있습니다. 어떤 이유로 든 시간 . 자세한 내용은 해당 package-info.java 파일을 참조하십시오. |
크로니클 대기열에서 V5 테일러는 이제 읽기 전용이며, 크로니클 큐 v4에는 게으른 색인 개념이 있었는데, 여기서 부록은 인덱스를 작성하지 않고 대신 테일러가 인덱싱을 수행 할 수 있습니다. 우리는 V5에서 게으른 색인화를 삭제하기로 결정했습니다. 테일러를 읽을 수있게하면 크로니클 큐를 단순화 할뿐만 아니라 코드의 다른 곳에서 최적화를 추가 할 수 있습니다.
크로니클 대기열의 잠금 모델은 v5에서 변경되었으며, 크로니클 큐 v4에서는 쓰기 잠금 (대기열에 동시 쓰기를 방지하기 위해)이 .cq4 파일에 존재합니다. v5에서 이것은 테이블 스토어 (metadata.cq4t)라는 단일 파일로 이동했습니다. 테이블 스토어 파일 만 검사해야하므로 내부적으로 잠금 코드를 단순화합니다.
크로니클 큐 v5를 사용하여 크로니클 큐 v4로 작성된 메시지를 읽을 수 있지만 항상 작동하는 것은 아닙니다. 예를 들어, wireType(WireType.FIELDLESS_BINARY) 로 V4 큐를 만들면 크로니클 큐 v5가 불가능합니다. 대기열의 헤더를 읽으십시오. V5 읽기 V4 대기열에 대한 테스트가 있지만 이들은 제한되어 있으며 모든 시나리오가 지원되지 않을 수 있습니다.
크로니클 큐 v5를 사용하여 크로니클 큐 v4 대기열에 쓸 수 없습니다.
크로니클 대기열 V4는 v3에 존재하는 다음 문제를 해결하는 크로니클 큐의 완전한 재 작성입니다.
자체 설명 메시지가 없으면 사용자는 메시지를 덤프하고 장기 데이터 저장을 위해 자신의 기능을 만들어야했습니다. V4를 사용하면이 작업을 수행 할 필요는 없지만 원하는 경우 할 수 있습니다.
바닐라 크로니클 대기열은 스레드 당 파일을 만듭니다. 스레드 수가 제어되는 경우에는 괜찮지 만 많은 응용 프로그램이 사용되는 스레드 수를 거의 또는 전혀 제어 할 수 없으며 이로 인해 유용성 문제가 발생했습니다.
Indexed 및 Vanilla Chronicle의 구성은 전적으로 코드에 있었으므로 독자는 작가와 동일한 구성을 가져야했으며 항상 그것이 무엇인지 명확하지는 않았습니다.
생산자가 두 번째 기계에 얼마나 많은 데이터가 복제되었는지 알 수있는 방법은 없었습니다. 유일한 해결 방법은 데이터를 생산자에게 복제하는 것이 었습니다.
메시지 작성을 시작하기 전에 예약 할 데이터의 크기를 지정해야합니다.
인덱스 크로니클을 사용할 때는 appender를 위해 자신의 잠금을해야했습니다.
크로니클 큐 v3에서는 모든 것이 와이어가 아닌 바이트 측면에서 사용되었습니다. 크로니클 큐 v4에는 바이트를 사용하는 두 가지 방법이 있습니다. writeBytes 및 readBytes 메소드를 사용하거나 와이어에서 bytes() 얻을 수 있습니다. 예를 들어:
appender . writeBytes ( b -> b . writeInt ( 1234 ). writeDouble ( 1.111 ));
boolean present = tailer . readBytes ( b -> process ( b . readInt (), b . readDouble ())); try ( DocumentContext dc = appender . writingDocument ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// write to bytes
}
try ( DocumentContext dc = tailer . readingDocument ()) {
if ( dc . isPresent ()) {
Bytes <?> bytes = dc . wire (). bytes ();
// read from bytes
}
}Chronicle Queue Enterprise Edition은 성공적인 오픈 소스 크로니클 대기열의 상업적으로 지원되는 버전입니다. 오픈 소스 문서는 Enterprise Edition에 라이센스가 부여 될 때 제공되는 추가 기능을 설명하기 위해 다음 문서에 의해 확장됩니다. 이들은 다음과 같습니다.
메시지 대기열과 메시지의 암호화. 자세한 내용은 암호화 문서를 참조하십시오.
모든 대기열 데이터의 실시간 백업을 보장하기 위해 호스트 간의 TCP/IP (및 선택적으로 UDP) 복제. 자세한 내용은 복제 문서를 참조하십시오. 대기열 복제 프로토콜은 복제 프로토콜에 포함됩니다.
일일 대기열 롤오버 스케줄링에 대한 시간대 지원. 자세한 내용은 시간대 지원을 참조하십시오.
비동기 모드 지원 느린 파일 시스템에서 높은 처리량에서 개선 된 성능을 제공합니다. 자세한 내용은 비동기 모드 및 성능을 참조하십시오.
특이하지 않은 특이 치의 사전 터치 서, 사전 터치 및 구성을 참조하십시오
또한 기술 전문가의 지원을받을 것입니다.
Chronicle Queue Enterprise Edition에 대한 자세한 내용은 [email protected]에 문의하십시오.
연대기 대기열은 지원하도록 설계된 SingleChronicleQueue.class 에 의해 정의됩니다.
매일, 매주 또는 시간별 파일 롤링 파일,
같은 기계의 동시 작가,
TCP 복제 (Chronicle Queue Enterprise 포함)를 통해 동일한 기계 또는 여러 기계의 동시 독자,
Docker 또는 기타 컨테이너화 된 워크로드 사이의 동시 독자 및 작가
제로 사본 직렬화 및 사막화,
상품 하드웨어에 대한 수백만의 글/읽기.
i7-4790 프로세서의 96 바이트 메시지에 대해 약 5 백만 메시지/초. 큐 디렉토리 구조는 다음과 같습니다.
base-directory /
{cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling. 형식은 BinaryWire 또는 TextWire 사용하여 형식화되는 크기 정제 바이트로 구성됩니다. 크로니클 대기열은 코드에서 구동되도록 설계되었습니다. 필요에 맞는 인터페이스를 쉽게 추가 할 수 있습니다.
메모 | 상당히 낮은 수준의 작동으로 인해 크로니클 대기열 읽기/쓰기 작업은 확인되지 않은 예외를 던질 수 있습니다. 스레드 사망을 방지하기 위해 RuntimeExceptions 잡아서 적절하게 로그/분석하는 것이 실용적 일 수 있습니다. |
메모 | 크로니클 대기열을 사용하는 방법에 대한 시연은 크로니클 큐 데모를 참조하고 Java 문서를 참조하십시오. 크로니클 큐 javadocs 참조 |
다음 섹션에서 먼저, 우리는 연대기 대기열을 사용하기위한 몇 가지 용어와 빠른 참조를 소개합니다. 그런 다음보다 자세한 가이드를 제공합니다.
크로니클 대기열은 동일한 컴퓨터의 여러 JVM에서도 동시 작가와 독자를 지원하는 지속적인 Journal of 메시지입니다. 모든 독자는 모든 메시지를보고 독자는 언제든지 가입하고 여전히 모든 메시지를 볼 수 있습니다.
메모 | 우리는 의도적으로 소비자 라는 용어를 피하고 대신 읽기에 의해 메시지가 소비/파괴되지 않으므로 독자를 사용합니다. |
크로니클 대기열에는 다음과 같은 주요 개념이 있습니다.
발췌
발췌는 크로니클 큐의 주요 데이터 컨테이너입니다. 다시 말해, 각 연대기 대기열은 발췌문으로 구성됩니다. 연대기 대기열에 메시지를 작성한다는 것은 새로운 발췌문을 시작하고 메시지를 작성하고 끝에서 발췌 한 것을 끝내는 것을 의미합니다.
appender
Appender는 메시지의 원천입니다. 연대기 환경에서 반복자와 같은 것. 현재 크로니클 큐를 추가하는 데이터를 추가합니다. 대기열 끝에만 추가하여 순차적 인 글을 수행 할 수 있습니다. 발췌문을 삽입하거나 삭제하는 방법은 없습니다.
테일러
Tailer는 순차적 판독에 최적화 된 발췌 리더입니다. 순차 및 임의의 읽기를 전진과 뒤로 수행 할 수 있습니다. 테일러는 호출 할 때마다 사용 가능한 다음 메시지를 읽습니다. 다음은 크로니클 대기열에서 보장됩니다.
각 appender 에 대해 메시지는 Appender가 작성한 순서대로 작성됩니다. 다른 부록의 메시지는 인터리브되고
각 테일러 에 대해 주제에 대한 모든 메시지가 다른 모든 테일러와 같은 순서로 표시됩니다.
복제되면 모든 복제본에는 모든 메시지 사본이 있습니다.
크로니클 대기열은 중개인입니다. 중개인이있는 아키텍처가 필요한 경우 [email protected]에 문의하십시오.
파일 롤링 및 대기열 파일
크로니클 큐는 큐가 만들 때 선택한 롤 사이클에 따라 파일을 굴리도록 설계되었습니다 (RollCycles 참조). 다시 말해, 확장 cq4 있는 각 롤 사이클에 대해 큐 파일이 생성됩니다. 롤 사이클이 포인트에 도달하면 롤더는 현재 파일 끝에 EOF 마크를 원자 적으로 쓰지 않아 다른 Appender 가이 파일에 쓰지 않아야하며 Tailer가 더 읽지 않아야하며 모든 사람이 새 파일을 사용해야한다는 것을 나타냅니다.
롤 사이클이 새 파일을 사용해야 할 때 프로세스가 종료되고 나중에 다시 시작되면 Appender는 오래된 파일을 찾아서 EOF 마크를 작성하여 테일러가 읽는 데 도움이됩니다.
주제
각 주제는 큐 파일의 디렉토리입니다. mytopic 이라는 주제가있는 경우 레이아웃이 다음과 같습니다.
mytopic/
20160710.cq4
20160711.cq4
20160712.cq4
20160713.cq4하루 종일 (또는주기) 모든 데이터를 복사하려면 그 날의 파일을 재생 테스트를 위해 개발 시스템에 복사 할 수 있습니다.
주제 및 메시지에 대한 제한
주제는 디렉토리 이름으로 사용할 수있는 문자열로 제한됩니다. 주제 내에는 직렬화 될 수있는 데이터 유형 일 수있는 하위 기능을 가질 수 있습니다. 메시지는 직렬화 가능한 데이터 일 수 있습니다.
크로니클 대기열 지원 :
Serializable 물체는 효율적이지 않기 때문에 피해야하지만
표준 Java API를 사용하려면 Externalizable 물체가 선호됩니다.
byte[] 및 String
Marshallable ; Yaml, Binary Yaml 또는 JSON으로 작성할 수있는 자체 설명 메시지.
낮은 수준의 바이너리 또는 텍스트 인코딩 인 BytesMarshallable .
이 섹션에서는 크로니클 큐를 사용하여 대기열을 작성, 쓰기/읽는 방법을 간략하게 보여주기위한 빠른 참조를 제공합니다.
연대기 대기열 건설
크로니클 큐 인스턴스를 만드는 것은 생성자를 호출하는 것과 다릅니다. 인스턴스를 만들려면 ChronicleQueueBuilder 를 사용해야합니다.
String basePath = OS . getTarget () + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder . single ( basePath ). build (); 이 예에서는 두 개의 RandomAccessFiles 생성하는 IndexedChronicle 을 만들었습니다. 하나는 인덱스 용이고, 하나는 비교적 이름을 가진 데이터 용입니다.
${java.io.tmpdir}/getting-started/{today}.cq4대기열에 쓰기
// Obtains an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );대기열에서 읽습니다
// Creates a tailer
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ()); 또한 ChronicleQueue.dump() 메소드를 사용하여 원시 내용을 문자열로 버릴 수 있습니다.
queue . dump ();대청소
크로니클 대기열은 데이터를 해체하지 않으며 크로니클 대기열 작업을 마치고 무료 리소스로 close() 호출하는 것이 좋습니다.
메모 | 이렇게하면 데이터가 손실되지 않습니다. 이것은 사용 된 자원을 정리하는 것입니다. |
queue . close ();모든 것을 함께 모으십시오
try ( ChronicleQueue queue = SingleChronicleQueueBuilder . single ( "queue-dir" ). build ()) {
// Obtain an ExcerptAppender
ExcerptAppender appender = queue . acquireAppender ();
// Writes: {msg: TestMessage}
appender . writeDocument ( w -> w . write ( "msg" ). text ( "TestMessage" ));
// Writes: TestMessage
appender . writeText ( "TestMessage" );
ExcerptTailer tailer = queue . createTailer ();
tailer . readDocument ( w -> System . out . println ( "msg: " + w . read (()-> "msg" ). text ()));
assertEquals ( "TestMessage" , tailer . readText ());
} 구성 매개 변수 또는 시스템 속성을 사용하여 크로니클 큐를 구성 할 수 있습니다. 또한 프록시 사용 및 MethodReader 및 MethodWriter 사용과 같은 대기열에 대한 쓰기/읽기/읽기 방법에는 여러 가지가 있습니다.
크로니클 큐 (CQ)는 SingleChronicleQueueBuilder 클래스의 여러 메소드를 통해 구성 할 수 있습니다. 고객이 가장 많이 쿼리 한 일부 매개 변수는 아래에 설명되어 있습니다.
롤 사이클
RollCycle 매개 변수는 CQ가 기본 대기열 파일을 굴리는 속도를 구성합니다. 예를 들어, 다음 코드 스 니펫을 사용하면 큐 파일이 롤링됩니다 (예 : 새 파일이 생성).
ChronicleQueue . singleBuilder ( queuePath ). rollCycle ( RollCycles . HOURLY ). build () 대기열의 롤 사이클이 설정되면 나중에 변경할 수 없습니다. 동일한 경로를 사용하도록 구성된 SingleChronicleQueue 의 추가 인스턴스는 동일한 롤 사이클을 사용하도록 구성되어야하며, 그렇지 않은 경우 롤 사이클이 지속되는 롤 사이클과 일치하도록 업데이트됩니다. 이 경우 라이브러리 사용자에게 상황을 알리기 위해 경고 로그 메시지가 인쇄됩니다.
// Creates a queue with roll-cycle MINUTELY
try ( ChronicleQueue minuteRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( MINUTELY ). build ()) {
// Creates a queue with roll-cycle HOURLY
try ( ChronicleQueue hourlyRollCycleQueue = ChronicleQueue . singleBuilder ( queueDir ). rollCycle ( HOURLY ). build ()) {
try ( DocumentContext documentContext = hourlyRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext . wire (). write ( "somekey" ). text ( "somevalue" );
}
}
// Now try to append using the queue configured with roll-cycle MINUTELY
try ( DocumentContext documentContext2 = minuteRollCycleQueue . acquireAppender (). writingDocument ()) {
documentContext2 . wire (). write ( "otherkey" ). text ( "othervalue" );
}
}콘솔 출력 :
[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.큐 파일에 저장할 수있는 최대 메시지 수는 롤 사이클에 따라 다릅니다. 이에 대한 자세한 내용은 FAQ를 참조하십시오.
크로니클 대기열에서 롤오버 시간은 UTC를 기반으로합니다. TimeZone Rollover Enterprise 기능은 UTC보다는 대기열 롤오버의 시간과 주기성을 지정하는 크로니클 큐의 능력을 확장합니다. 자세한 내용은 TimeZone 대기열 롤오버를 참조하십시오.
크로니클 큐 FileUtil 클래스는 큐 파일을 관리하는 데 유용한 방법을 제공합니다. 롤 파일 관리를 직접 참조하십시오.
도청
Chronicle Queue가 WireType 명시 적으로 설정하여 데이터를 저장하는 방법을 구성 할 수 있습니다.
// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder . builder ( queuePath , wireType )예를 들어:
// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue . singleBuilder ( queuePath )
// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder . fieldlessBinary ( queuePath )
// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder . defaultZeroBinary ( queuePath )
// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder . deltaBinary ( queuePath )건축업자를 만들 때 WireType을 명시 적으로 제공 할 수는 있지만 모든 와이어 유형이 크로니클 대기열에 의해 지원되는 것은 아니기 때문에 권장하지 않습니다. 특히 다음 와이어 유형은 지원되지 않습니다.
텍스트 (및 본질적으로 JSON 및 CSV를 포함한 텍스트를 기반으로합니다)
날것의
read_any
블록 크기
대기열이 읽기/작성되면 현재 읽기/작성중인 파일의 일부가 메모리 세그먼트에 매핑됩니다. 이 매개 변수는 메모리 매핑 블록의 크기를 제어합니다. 필요한 경우 SingleChronicleQueueBuilder.blockSize(long blockSize) 메소드를 사용 하여이 매개 변수를 변경할 수 있습니다.
메모 | blockSize 불필요하게 변경하지 않아야합니다. |
큰 메시지를 보내는 경우 큰 blockSize 설정해야합니다. 즉, blockSize 메시지 크기의 4 배 이상이어야합니다.
경고 | 큰 메시지에 작은 blockSize 사용하는 경우 IllegalStateException 을 받고 쓰기가 중단됩니다. |
큐를 복제 할 때 각 큐 인스턴스에 대해 동일한 blockSize 사용하는 것이 좋습니다. blockSize 대기열의 메타 데이터에 기록되지 않으므로 크로니클 큐 인스턴스를 만들 때 이상적으로 동일한 값으로 설정해야합니다 (권장하지만 원하는 경우 권장됩니다. 다른 blocksize 로 실행하려면).
팁 | 복제 된 큐의 각 인스턴스에 대해 동일한 blockSize 사용하십시오. |
인덱스 스페이스
이 매개 변수는 명시 적으로 인덱싱 된 발췌문 사이의 공간을 보여줍니다. 숫자가 높으면 순차적 인 쓰기 성능이 높지만 무작위 액세스가 느리게 읽는 것을 의미합니다. 순차적 판독 성능은이 속성의 영향을받지 않습니다. 예를 들어 다음 기본 색인 간격을 반환 할 수 있습니다.
16 (Minutely)
64 (매일)
SingleChronicleQueueBuilder.indexSpacing(int indexSpacing) 을 사용 하여이 매개 변수를 변경할 수 있습니다.
인덱스 카운트
각 인덱스 배열의 크기와 큐 파일 당 총 인덱스 배열 수입니다.
메모 | IndexCount 2는 인덱스 큐 항목의 최대 수입니다. |
메모 | 인덱스 사용의 자세한 내용과 예는이 사용자 안내서의 연대기 대기열의 초기 인덱싱 섹션을 참조하십시오. |
ReadBufferMode, WriteBufferMode
이 매개 변수는 다음 옵션이있는 읽기 또는 쓰기에 대해 BufferMode를 정의합니다.
None - 기본값 (및 오픈 소스 사용자가 사용할 수있는 유일한 것), 버퍼링 없음;
Copy - 암호화와 함께 사용;
Asynchronous - 크로니클 비동기 모드에서 제공하는 읽기 및/또는 쓰기시 비동기 버퍼를 사용하십시오.
Buffercapacity
bufferMode: Asynchronous 사용할 때 바이트의 Ringbuffer 용량
크로니클 대기열에서 우리는 귀하의 데이터를 연대기 대기열에 작성하는 행위를 발췌 내용으로 언급합니다. 이 데이터는 텍스트, 숫자 또는 직렬화 된 블로브를 포함한 모든 데이터 유형으로 구성 될 수 있습니다. 궁극적으로 모든 데이터는 그것이 무엇인지에 관계없이 일련의 바이트로 저장됩니다.
발췌를 저장하기 직전에 Chronicle Queue는 4 바이트 헤더를 보유합니다. 크로니클 대기열은 데이터의 길이를이 헤더에 씁니다. 이런 식으로 크로니클 대기열이 발췌를 읽을 때 각 데이터 덩어리가 얼마나 오래 걸리는지 알고 있습니다. 우리는이 4 바이트 헤더와 발췌문을 문서로 언급합니다. 크로니클 대기열은 문서를 읽고 쓰는 데 사용될 수 있습니다.
메모 | 이 4 바이트 헤더 내에서 우리는 프로세서와 스레드 모두에서 크로니클 큐 스레드 안전을 만들기 위해 잠금과 같은 여러 내부 작업에 대해 몇 비트를 예약합니다. 주목해야 할 중요한 점은 이로 인해 4 바이트를 정수로 엄격하게 변환하여 데이터 블로브의 길이를 찾을 수 없다는 것입니다. |
앞에서 언급 한 바와 같이, 크로니클 대기열은 appender를 사용하여 대기열에 쓰고 대기열에서 읽을 때 테일러를 사용합니다. 다른 Java 대기열 솔루션과 달리 Tailer와 함께 읽을 때 메시지가 손실되지 않습니다. 이것은 "테일러를 사용하는 대기열에서 읽는 것"에 대한 아래 섹션에서 더 자세히 설명합니다. 크로니클 대기열에 데이터를 작성하려면 먼저 Appender를 만들어야합니다.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
}크로니클 큐는 다음과 같은 저수준 인터페이스를 사용하여 데이터를 작성합니다.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
} 시도가있는 재료의 닫기는 데이터의 길이가 헤더에 기록되는 시점입니다. DocumentContext 사용하여 데이터가 지정된 인덱스를 찾을 수도 있습니다 (아래 참조). 나중에이 색인을 사용 하여이 발췌문을 이동/찾아 볼 수 있습니다. 각 크로니클 대기열 발췌문에는 고유 한 색인이 있습니다.
try ( final DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write (). text (“ your text data “);
System . out . println ( "your data was store to index=" + dc . index ());
} writeText() 와 같은 아래의 높은 수준의 방법은 appender.writingDocument() 호출하는 편의 메소드이지만 두 가지 접근 방식은 본질적으로 동일한 작업을 수행합니다. 실제 writeText(CharSequence text) 는 다음과 같습니다.
/**
* @param text the message to write
*/
void writeText ( CharSequence text ) {
try ( DocumentContext dc = writingDocument ()) {
dc . wire (). bytes (). append8bit ( text );
}
}따라서 많은 고급 인터페이스, 저수준 API, 원시 메모리까지 선택할 수 있습니다.
이것은 당신이 메시지에 전혀 쓰고 있다는 사실을 숨기는 최고 수준의 API입니다. 이점은 실제 구성 요소와 인터페이스로 호출을 교체하거나 다른 프로토콜에 대한 인터페이스를 스왑 할 수 있다는 것입니다.
// using the method writer interface.
RiskMonitor riskMonitor = appender . methodWriter ( RiskMonitor . class );
final LocalDateTime now = LocalDateTime . now ( Clock . systemUTC ());
riskMonitor . trade ( new TradeDetails ( now , "GBPUSD" , 1.3095 , 10e6 , Side . Buy , "peter" ));"자기 설명 메시지"를 쓸 수 있습니다. 이러한 메시지는 스키마 변경을 지원할 수 있습니다. 또한 문제를 디버깅하거나 진단 할 때 이해하기가 더 쉽습니다.
// writing a self describing message
appender . writeDocument ( w -> w . write ( "trade" ). marshallable (
m -> m . write ( "timestamp" ). dateTime ( now )
. write ( "symbol" ). text ( "EURUSD" )
. write ( "price" ). float64 ( 1.1101 )
. write ( "quantity" ). float64 ( 15e6 )
. write ( "side" ). object ( Side . class , Side . Sell )
. write ( "trader" ). text ( "peter" )));자기 설명 인 "원시 데이터"를 쓸 수 있습니다. 유형은 항상 정확합니다. 위치는 해당 값의 의미에 대한 유일한 표시입니다.
// writing just data
appender . writeDocument ( w -> w
. getValueOut (). int32 ( 0x123456 )
. getValueOut (). int64 ( 0x999000999000L )
. getValueOut (). text ( "Hello World" ));자기 설명이 아닌 "원시 데이터"를 쓸 수 있습니다. 독자는이 데이터의 의미와 사용 된 유형을 알아야합니다.
// writing raw data
appender . writeBytes ( b -> b
. writeByte (( byte ) 0x12 )
. writeInt ( 0x345678 )
. writeLong ( 0x999000999000L )
. writeUtf8 ( "Hello World" ));아래에서 데이터를 작성하는 가장 낮은 수준의 방법이 설명되어 있습니다. 당신은 원시 메모리에 대한 주소를 얻고 원하는 것을 쓸 수 있습니다.
// Unsafe low level
appender . writeBytes ( b -> {
long address = b . address ( b . writePosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
unsafe . putByte ( address , ( byte ) 0x12 );
address += 1 ;
unsafe . putInt ( address , 0x345678 );
address += 4 ;
unsafe . putLong ( address , 0x999000999000L );
address += 8 ;
byte [] bytes = "Hello World" . getBytes ( StandardCharsets . ISO_8859_1 );
unsafe . copyMemory ( bytes , Jvm . arrayByteBaseOffset (), null , address , bytes . length );
b . writeSkip ( 1 + 4 + 8 + bytes . length );
});대기열의 내용을 인쇄 할 수 있습니다. 처음 두 개를 볼 수 있고 마지막 두 메시지는 동일한 데이터를 저장합니다.
// dump the content of the queue
System . out . println ( queue . dump ());인쇄물:
# position: 262568, header: 0
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : GBPUSD,
price : 1.3095,
quantity : 10000000.0,
side : Buy,
trader : peter
}
# position: 262684, header: 1
--- !!data # binary
trade : {
timestamp : 2016-07-17T15:18:41.141,
symbol : EURUSD,
price : 1.1101,
quantity : 15000000.0,
side : Sell,
trader : peter
}
# position: 262800, header: 2
--- !!data # binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data # binary
000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld
# position: 262859, header: 4
--- !!data # binary
000402c0 12 ·
000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64 lo World 큐를 읽는 것은 글을 읽으려고 할 때 메시지가 없을 가능성이 있다는 점을 제외하고는 쓰기와 같은 패턴을 따릅니다.
try ( ChronicleQueue queue = ChronicleQueue . singleBuilder ( path + "/trades" ). build ()) {
final ExcerptTailer tailer = queue . createTailer ();
} 메시지의 내용에 따라 각 메시지를 메소드 호출로 바꾸고 연대기 큐가 메소드 인수를 자동으로 해제 할 수 있습니다. Calling reader.readOne() 메소드 리더와 일치하지 않는 메시지를 자동으로 건너 뛰기 (필터링)합니다.
// reading using method calls
RiskMonitor monitor = System . out :: println ;
MethodReader reader = tailer . methodReader ( monitor );
// read one message
assertTrue ( reader . readOne ());메시지를 직접 해독 할 수 있습니다.
메모 | 필드의 이름, 유형 및 순서는 일치 할 필요가 없습니다. |
assertTrue ( tailer . readDocument ( w -> w . read ( "trade" ). marshallable (
m -> {
LocalDateTime timestamp = m . read ( "timestamp" ). dateTime ();
String symbol = m . read ( "symbol" ). text ();
double price = m . read ( "price" ). float64 ();
double quantity = m . read ( "quantity" ). float64 ();
Side side = m . read ( "side" ). object ( Side . class );
String trader = m . read ( "trader" ). text ();
// do something with values.
})));자체 설명 데이터 값을 읽을 수 있습니다. 이렇게하면 유형이 올바른지 확인하고 필요에 따라 변환됩니다.
assertTrue ( tailer . readDocument ( w -> {
ValueIn in = w . getValueIn ();
int num = in . int32 ();
long num2 = in . int64 ();
String text = in . text ();
// do something with values
}));원시 데이터를 프리미티브 및 문자열로 읽을 수 있습니다.
assertTrue ( tailer . readBytes ( in -> {
int code = in . readByte ();
int num = in . readInt ();
long num2 = in . readLong ();
String text = in . readUtf8 ();
assertEquals ( "Hello World" , text );
// do something with values
}));또는 기본 메모리 주소를 얻고 기본 메모리에 액세스 할 수 있습니다.
assertTrue ( tailer . readBytes ( b -> {
long address = b . address ( b . readPosition ());
Unsafe unsafe = UnsafeMemory . UNSAFE ;
int code = unsafe . getByte ( address );
address ++;
int num = unsafe . getInt ( address );
address += 4 ;
long num2 = unsafe . getLong ( address );
address += 8 ;
int length = unsafe . getByte ( address );
address ++;
byte [] bytes = new byte [ length ];
unsafe . copyMemory ( null , address , bytes , Jvm . arrayByteBaseOffset (), bytes . length );
String text = new String ( bytes , StandardCharsets . UTF_8 );
assertEquals ( "Hello World" , text );
// do something with values
})); 메모 | 모든 테일러는 모든 메시지를 본다. |
추상화는 메시지를 필터에 추가하거나 하나의 메시지 프로세서에 메시지를 할당 할 수 있습니다. 그러나 일반적으로 주제에 대해서는 하나의 메인 테일러 만 필요하며, 아마도 일부는 모니터링 등을 지원합니다.
크로니클 대기열은 주제를 분할하지 못하므로 해당 주제 내에서 모든 메시지를 전체 주문을받습니다. 주제에 걸쳐 주문에 대한 보장은 없습니다. 여러 주제에서 소비되는 시스템에서 결정적으로 재생하려면 해당 시스템의 출력에서 재생하는 것이 좋습니다.
크로니클 대기열 테일러는 파일 처리기를 만들 수 있으며, 관련 크로니클 큐의 close() 메소드가 호출되거나 JVM이 쓰레기 수집을 실행할 때마다 파일 핸들러가 정리됩니다. 코드를 작성하는 경우 GC 일시 정지가없고 파일 처리기를 명시 적으로 정리하려는 경우 다음에 전화 할 수 있습니다.
(( StoreTailer ) tailer ). releaseResources ()ExcerptTailer.toEnd() 사용 일부 응용 프로그램에서는 큐 끝에서 읽기 시작해야 할 수도 있습니다 (예 : 재시작 시나리오). 이 사용을 위해 ExcerptTailer toEnd() 메소드를 제공합니다. 테일러 방향이 FORWARD 나오는 경우 (기본적으로 또는 ExcerptTailer.direction 메소드에 의해 설정된대로) toEnd() 에게 큐에 마지막 기존 레코드 직후 테일러를 배치합니다. 이 경우 Tailer는 이제 줄에 추가 된 새로운 레코드를 읽을 준비가되었습니다. 새로운 메시지가 큐에 추가 될 때까지 읽을 수있는 새로운 DocumentContext 없습니다.
// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer . toEnd (). readingDocument (). isPresent ();끝에서 대기열을 통해 뒤로 읽어야한다면 테일러를 뒤로 읽도록 설정할 수 있습니다.
ExcerptTailer tailer = queue . createTailer ();
tailer . direction ( TailerDirection . BACKWARD ). toEnd (); 뒤로 읽을 때 toEnd() 메소드는 테일러를 큐의 마지막 레코드로 이동합니다. 큐가 비어 있지 않으면 읽을 수있는 DocumentContext 있습니다.
// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer . toEnd (). direction ( TailerDirection . BACKWARD ).
readingDocument (). isPresent ();일명 Tailers.
애플리케이션을 다시 시작할 때까지 계속되는 테일러를 갖는 것이 유용 할 수 있습니다.
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 0" , atailer . readText ());
assertEquals ( "test 1" , atailer . readText ());
assertEquals ( "test 2" , atailer . readText ()); // (1)
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 0" , btailer . readText ()); // (3)
}
try ( ChronicleQueue cq = SingleChronicleQueueBuilder . binary ( tmp ). build ()) {
ExcerptTailer atailer = cq . createTailer ( "a" );
assertEquals ( "test 3" , atailer . readText ()); // (2)
assertEquals ( "test 4" , atailer . readText ());
assertEquals ( "test 5" , atailer . readText ());
ExcerptTailer btailer = cq . createTailer ( "b" );
assertEquals ( "test 1" , btailer . readText ()); // (4)
}Tailer "A"마지막 메시지 2
Tailer "A"다음은 메시지를 읽습니다
Tailer "B"는 마지막으로 메시지 0을 읽습니다
Tailer "B"는 다음 메시지를 읽습니다
This is from the RestartableTailerTest where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart() , toEnd() , moveToIndex() or reads a message.
메모 | The direction() is not preserved across restarts, only the next index to be read. |
메모 | The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart. |
Chronicle Queue stores its data in binary format, with a file extension of cq4 :
��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������� This can often be a bit difficult to read, so it is better to dump the cq4 files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.
You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain or net.openhft.chronicle.queue.ChronicleReaderMain . DumpMain performs a simple dump to the terminal while ChronicleReaderMain handles more complex operations, eg tailing a queue. They can both be run from the command line in a number of ways described below.
If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4 file with the following command:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"In the above command myqueue is the directory containing your .cq4 files
You can also set up any dependent files manually. This requires the chronicle-queue.jar , from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4 팁 | To find out which version of jars to include please, refer to the chronicle-bom . |
Once the dependencies are present on the class path, you can run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4 This will dump the 19700101-02.cq4 file out as text, as shown below:
!!meta-data # binary
header : !SCQStore {
wireType : !WireType BINARY,
writePosition : 0,
roll : !SCQSRoll {
length : !int 3600000,
format : yyyyMMdd-HH,
epoch : !int 3600000
},
indexing : !SCQSIndexing {
indexCount : !short 4096,
indexSpacing : 4,
index2Index : 0,
lastIndex : 0
},
lastAcknowledgedIndexReplicated : -1,
recovery : !TimedStoreRecovery {
timeStamp : 0
}
}
...
# 4198044 bytes remaining 메모 | The example above does not show any user data, because no user data was written to this example file. |
There is also a script named dump_queue.sh located in the Chonicle-Queue/bin -folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain . The script can be run from the Chronicle-Queue root folder like this:
$ ./bin/dump_queue.sh <file path> ChronicleReaderMain The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain (in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).
Below is the command line interface used to configure ChronicleReaderMain :
usage: ChronicleReaderMain -a <binary-arg> Argument to pass to binary search class -b <binary-search> Use this class as a comparator to binary search -cbl <content-based-limiter> Specify a content-based limiter -cblArg <content-based-limiter-argument> Specify an argument for use by the content-based limiter -d <directory> Directory containing chronicle queue files -e <exclude-regex> Do not display records containing this regular expression -f Tail behaviour - wait for new records to arrive -g Show message history (when using method reader) -h Print this help and exit -i <include-regex> Display records containing this regular expression -k Read the queue in reverse -l Squash each output message into a single line -m <max-history> Show this many records from the end of the data set -n <from-index> Start reading from this index (eg 0x123ABE) -named <named> Named tailer ID -r <as-method-reader> Use when reading from a queue generated using a MethodWriter -s Display index -w <wire-type> Control output ie JSON -x <max-results> Limit the number of results to output -z Print timestamps using the local timezone
Just as with DumpQueue you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:
< build >
< plugins >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-shade-plugin</ artifactId >
< executions >
< execution >
< phase >package</ phase >
< goals >
< goal >shade</ goal >
</ goals >
< configuration >
< filters >
< filter >
< artifact >*:*</ artifact >
< includes >
< include >net/openhft/**</ include >
< include >software/chronicle/**</ include >
</ includes >
</ filter >
</ filters >
</ configuration >
</ execution >
</ executions >
</ plugin >
</ plugins >
</ build > Once the Uber jar is present, you can run ChronicleReaderMain from the command line via:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh which again is located in the Chonicle-Queue/bin -folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain . The script can be run from the Chronicle-Queue root folder like this:
$ ./bin/queue_reader.sh <options> ChronicleWriter If using MethodReader and MethodWriter then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain or the shell script queue_writer.sh eg
usage: ChronicleWriterMain files.. -d < directory > [-i < interface > ] -m < method >
Missing required options: m, d
-d < directory > Directory containing chronicle queue to write to
-i < interface > Interface to write via
-m < method > Method nameIf you want to write to the below "doit" method
public interface MyInterface {
void doit ( DTO dto );
}public class DTO extends SelfDescribingMarshallable { private int age; 개인 문자열 이름; }
Then you can call ChronicleWriterMain -d queue doit x.yaml with either (or both) of the below Yamls:
{
age : 19,
name : Henry
}또는
!x.y.z.DTO {
age : 42,
name : Percy
} If DTO makes use of custom serialisation then you should specify the interface to write to with -i
Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface , where all methods have:
arguments which are only inputs
no return value or exceptions expected.
import net . openhft . chronicle . wire . SelfDescribingMarshallable ;
interface MessageListener {
void method1 ( Message1 message );
void method2 ( Message2 message );
}
static class Message1 extends SelfDescribingMarshallable {
String text ;
public Message1 ( String text ) {
this . text = text ;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number ;
public Message2 ( long number ) {
this . number = number ;
}
}To write to the queue you can call a proxy which implements this interface.
SingleChronicleQueue queue1 = ChronicleQueue . singleBuilder ( path ). build ();
MessageListener writer1 = queue1 . acquireAppender (). methodWriter ( MessageListener . class );
// call method on the interface to send messages
writer1 . method1 ( new Message1 ( "hello" ));
writer1 . method2 ( new Message2 ( 234 ));These calls produce messages which can be dumped as follows.
# position: 262568, header: 0
--- !!data # binary
method1 : {
text : hello
}
# position: 262597, header: 1
--- !!data # binary
method2 : {
number : !int 234
}To read the messages, you can provide a reader which calls your implementation with the same calls that you made.
// a proxy which print each method called on it
MessageListener processor = ObjectUtils . printAll ( MessageListener . class )
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1 . createTailer (). methodReader ( processor );
assertTrue ( reader1 . readOne ());
assertTrue ( reader1 . readOne ());
assertFalse ( reader1 . readOne ());Running this example prints:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]For more details see, Using Method Reader/Writers and MessageReaderWriterTest
Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId of the queue.
ChronicleQueue out = ChronicleQueue . singleBuilder ( queuePath )
...
. sourceId ( 1 )
. build ();
SidedMarketDataListener combiner = out . acquireAppender ()
. methodWriterBuilder ( SidedMarketDataListener . class )
. get ();
combiner . onSidedPrice ( new SidedPrice ( "EURUSD1" , 123456789000L , Side . Sell , 1.1172 , 2e6 ));A timestamp is added for each read and write as it passes from service to service.
--- !!data # binary
history : {
sources : [
1,
0x426700000000 # (4)
]
timings : [
1394278797664704, # (1)
1394278822632044, # (2)
1394278824073475 # (3)
]
}
onTopOfBookPrice : {
symbol : EURUSD1,
timestamp : 123456789000,
buyPrice : NaN,
buyQuantity : 0,
sellPrice : 1.1172,
sellQuantity : 2000000.0
}First write
First read
Write of the result of the read.
What triggered this event.
In the following section you will find how to work with the excerpt index.
Finding the index at the end of a Chronicle Queue
Chronicle Queue appenders are thread-local. In fact when you ask for:
final ExcerptAppender appender = queue.acquireAppender(); the acquireAppender() uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:
long index = appender.lastIndexAppended();will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:
queue.lastIndex()Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it's possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.
The number of messages between two indexes
To count the number of messages between two indexes you can use:
((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>); 메모 | You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system. |
for more information on this see :
net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerptsMove to a specific message and read it
The following example shows how to write 10 messages, then move to the 5th message to read it
@ Test
public void read5thMessageTest () {
try ( final ChronicleQueue queue = singleBuilder ( getTmpDir ()). build ()) {
final ExcerptAppender appender = queue . acquireAppender ();
int i = 0 ;
for ( int j = 0 ; j < 10 ; j ++) {
try ( DocumentContext dc = appender . writingDocument ()) {
dc . wire (). write ( "hello" ). text ( "world " + ( i ++));
long indexWritten = dc . index ();
}
}
// Get the current cycle
int cycle ;
final ExcerptTailer tailer = queue . createTailer ();
try ( DocumentContext documentContext = tailer . readingDocument ()) {
long index = documentContext . index ();
cycle = queue . rollCycle (). toCycle ( index );
}
long index = queue . rollCycle (). toIndex ( cycle , 5 );
tailer . moveToIndex ( index );
try ( DocumentContext dc = tailer . readingDocument ()) {
System . out . println ( dc . wire (). read ( "hello" ). text ());
}
}
} You can add a StoreFileListener to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.
Appenders and tailers are cheap as they don't even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:
a cycle number. For example, days since epoch, and
a sequence number within that cycle.
In the case of a DAILY cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.
Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.
Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:
Jvm . warn (). on ( getClass (), "your disk " + fileStore + " is almost full, " +
"warning: chronicle-queue may crash if it runs out of space." );otherwise it will check for the threshold percentage and log out this message:
Jvm . warn (). on ( getClass (), "your disk " + fileStore
+ " is " + diskSpaceFull + "% full, " +
"warning: chronicle-queue may crash if it runs out of space." );The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.
As mentioned previously Chronicle Queue stores its data off-heap in a '.cq4' file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new '.cq4' file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.
If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it's a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS's a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.
Pretoucher is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute() method, this object will pre-touch pages in the queue's underlying store file, so that they are resident in the page-cache (ie loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown() method can be called to close the supplied queue and release any other resources. Invocation of the execute() method after shutdown() has been called will cause an IllegalStateException to be thrown.
The Pretoucher's configuration parameters (set via the system properties) are as follows:
SingleChronicleQueueExcerpts.earlyAcquireNextCycle (defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.
경고 | earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system. |
SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs (defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher's notion of which cycle is "current" into the future by pretoucherPrerollTimeMs .
SingleChronicleQueueExcerpts.dontWrite (defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle from working.
Pretoucher usage example
The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle is set to true and pretoucherPrerollTimeMs to 100ms.
System . setProperty ( "SingleChronicleQueueExcerpts.earlyAcquireNextCycle" , "true" );
System . setProperty ( "SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs" , "100" ); The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute() method the Pretoucher starts.
// Creates the queue q1 (or q1 is a queue that already exists)
try ( final SingleChronicleQueue q1 = SingleChronicleQueueBuilder . binary ( "queue-storage-path" ). build ();
final Pretoucher pretouch = PretouchUtil . INSTANCE . createPretoucher ( q1 )){
try {
pretouch . execute ();
} catch ( InvalidEventHandlerException e ) {
throw Jvm . rethrow ( e );
}
} The method close() , closes the Pretoucher and releases its resources.
pretouch . close (); 메모 | The Pretoucher is an Enterprise feature |
Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).
The following charts show how long it takes to:
write a 40 byte message to a Chronicle Queue
have the write replicated over TCP
have the second copy acknowledge receipt of the message
have a thread read the acknowledged message
The test was run for ten minutes, and the distribution of latencies plotted.
메모 | There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually. |
The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.
When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument() is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.
When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument() call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close() is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling
SingleChronicleQueueBuilder.doubleBuffer(true); 메모 | During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException . This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed. |
This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (eg BytesMarshallable's marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there's a heavy contention on writes (eg 2 or more threads writing a lot of data to the queue at a very high rate).
결과:
Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark ), YMMV - always do your own benchmarks:
1 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 90.40 90.59 91.17 0.42 90: 179.52 180.29 97.50 36.14 99: 187.33 186.69 186.82 0.05 99.7: 213.57 198.72 217.28 5.86 --------------------------------------------------------- --------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 179.14 179.26 180.93 0.62 90: 183.49 183.36 185.92 0.92 99: 192.19 190.02 215.49 8.20 99.7: 240.70 228.16 258.88 8.24 --------------------------------------------------------- --------------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.05 85.60 86.24 0.50 90: 170.18 169.79 170.30 0.20 99: 176.83 176.58 177.09 0.19 99.7: 183.36 185.92 183.49 0.88 --------------------------------------------------------- --------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 86.24 85.98 86.11 0.10 90: 89.89 89.44 89.63 0.14 99: 169.66 169.79 170.05 0.10 99.7: 175.42 176.32 176.45 0.05 --------------------------------------------------------- --------------------------------------------------------- --------------
4 KB
Double-buffer disabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 691.46 699.65 701.18 0.15 90: 717.57 722.69 721.15 0.14 99: 752.90 748.29 748.29 0.00 99.7: 1872.38 1743.36 1780.22 1.39 --------------------------------------------------------- --------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 350.59 353.66 353.41 0.05 90: 691.46 701.18 697.60 0.34 99: 732.42 733.95 729.34 0.42 99.7: 1377.79 1279.49 1302.02 1.16 --------------------------------------------------------- --------------------------------------------------------- --------------
Double-buffer enabled:
-------------------------------- SUMMARY (Concurrent) -------------- ----------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 342.40 344.96 344.45 0.10 90: 357.25 360.32 359.04 0.24 99: 688.38 691.97 691.46 0.05 99.7: 1376.77 1480.19 1383.94 4.43 --------------------------------------------------------- --------------------------------------------------------- -------------- -------------------------------- SUMMARY (Concurrent2) -------------- ---------------------------------------------------------- Percentile run1 run2 run3 % Variation 50: 343.68 345.47 346.24 0.15 90: 360.06 362.11 363.14 0.19 99: 694.02 698.62 699.14 0.05 99.7: 1400.32 1510.91 1435.14 3.40 --------------------------------------------------------- --------------------------------------------------------- --------------
If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain
net . openhft . chronicle . queue . jitter . QueueReadJitterMain This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop , you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise
It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :
28 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 25 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 21 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027) at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43) at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90) at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31) at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297) at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246) 14 at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54) at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal() if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint into the code because thread dumps are only reported at safe-points.
결과:
In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.
Acceptable Latency | 처리량 |
< 30 microseconds 99.3% of the time | 7 million message per second |
< 20 microseconds 99.9% of the time | 20 million messages per second |
< 1 milliseconds 99.9% of the time | 50 million messages per second |
< 60 microseconds 99.3% of the time | 80 million message per second |
Batching and Queue Latency
End-to-End latency plots for various message sizes
Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn't match Chronicle Queue's ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.
Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.
Kafka uses an intermediate broker to use the operating system's file system and cache, while Chronicle Queue directly uses the operating system's file system and cache. For comparison see Kafka Documentation
Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue
FAQ - questions asked by customers
How it works - more depth on how Chronicle Queue is implemented
Utilities - lists some useful utilities for working with queue files
Chronicle support on StackOverflow
Chronicle support on Google Groups
Leave your e-mail to get information about the latest releases and patches to stay up-to-date.