1. บทนำ
เมื่อระบบมีข้อมูลจำนวนมากที่ต้องนำเข้าจากฐานข้อมูลการใช้ชุดสปริงสามารถปรับปรุงประสิทธิภาพของการนำเข้าได้ Spring Batch ใช้ itemReader เพื่ออ่านหน้าข้อมูลและ itemWriter เขียนข้อมูลในแบทช์ เนื่องจาก Spring Batch ไม่ได้จัดเตรียม itemWriter และ itemReader ของ Elastisearch ในตัวอย่างนี้จึงใช้ ElasticSearchItemWriter ที่กำหนดเอง (ELASTICSECHITEMREADER) สำหรับการนำเข้าแบทช์
2. ตัวอย่าง
2.1 pom.xml
บทความนี้ใช้ Spring Data Jest เพื่อเชื่อมต่อ ES (คุณยังสามารถใช้ Spring Data Elasticsearch เพื่อเชื่อมต่อ ES) และรุ่น ES คือ 5.5.3
<? xml version = "1.0" การเข้ารหัส = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/ XSI: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" <ArtIfactId> es-etl </artifactid> <persion> 0.0.1-snapshot </เวอร์ชัน> <packaging> jar </packaging> <name> es-etl </name> <คำอธิบาย> โครงการสาธิตสำหรับการบูตฤดูใบไม้ผลิ </คำอธิบาย> <carent> <Sersion> 2.0.0.M7 </Strement> <inelyPath/> <!-การค้นหาพาเรนต์จากที่เก็บ-> </Parent> <properties> <project.build.sourceencoding> UTF-8 </project.build.sourceencoding> <java.version> 1.8 </java.version> </premerties> <การพึ่งพา> <การพึ่งพา> <roupId> org.springframework.boot </groupId> <ArtIfactId> Spring-Boot-Starter-Data-JPA </artifactId> </การพึ่งพาอาศัย> <การพึ่งพา> <GroupId> org.postgresql </groupId> <ArtIfactId> Spring-Boot-Starter-Batch </artifactId> </serdency> <การพึ่งพา> <roupId> com.github.vanroy </groupId> <ratifactid> Spring-Boot-Starter-Data-jest </artifactid> <version> 3.0.0.0.Release <ArtIfactId> jest </artifactId> <persion> 5.3.2 </เวอร์ชัน> </การพึ่งพา> <การพึ่งพา> <roupId> org.projectlombok </groupId> <ratifactid> Lombok </artifactid> <ArtIfactId> Spring-Boot-Starter-Test </ArtIfactId> <Scope> ทดสอบ </cope> </derpendency> </การพึ่งพา> <uffer> <build> <build> <build> <plugins> <repository> <id> spring-snapshots </id> <name> สปริงสแน็ปช็อต </name> <url> https://repo.spring.io/snapshot </url> <napshots> <enableStones </enabled> </snapshots> <url> https://repo.spring.io/milestone </url> <plotshots> <enabled> false </enabled> </namshots> </repository> </repository> <url> https://repo.spring.io/snapshot </url> <plopshots> <enabled> True </enabled> </snapshots> </pluginrepository> <pluginrepository> <url> https://repo.spring.io/milestone </url> <plopshots> <enabled> false </enabled> </napshots> </pluginrepository> </pluginrepository>
2.2 เอนทิตีคลาสและที่เก็บข้อมูล
แพ็คเกจ com.hfcsbc.esetl.domain; นำเข้า lombok.data; นำเข้า org.springframework.data.elasticsearch.annotations.document; นำเข้า org.springframework.data.elasticsearch.annotations.notations. org.springframework.data.elasticsearch.annotations.fieldtype; นำเข้า Javax.persistence.entity; นำเข้า Javax.persistence.id; นำเข้า Javax.persistence.onetoone;/** * สร้างโดย Pengchao ในปี 2018/23 refreshInterval = "-1")@entity@datapublic class person {@id ส่วนตัว ID ส่วนตัว; ชื่อสตริงส่วนตัว; @onetoone @field (type = fieldtype.nested) ที่อยู่ส่วนตัว;}แพ็คเกจ com.hfcsbc.esetl.domain; นำเข้า lombok.data; นำเข้า javax.persistence.entity; นำเข้า Javax.persistence.id;/** * สร้างโดย Pengchao เมื่อปี 2018/2/23 */@entity@datapublic enter ชื่อสตริงส่วนตัว;}
แพ็คเกจ com.hfcsbc.esetl.repository.jpa; นำเข้า com.hfcsbc.esetl.domain.person; นำเข้า org.springframework.data.jpa.repository.jparepository;
แพ็คเกจ com.hfcsbc.esetl.repository.es; นำเข้า com.hfcsbc.esetl.domain.person; นำเข้า org.springframework.data.elasticsearch.repository.ElasticSearchRepository; ยาว> {}2.3 การกำหนดค่า elasticsearchitemwriter
แพ็คเกจ com.hfcsbc.esetl.itemwriter; นำเข้า com.hfcsbc.esetl.repository.es.espersonrepository; นำเข้า com.hfcsbc.esetl.domain.person; นำเข้า org.springframework.batch.core.exitstatus; org.springframework.batch.core.stepexecution; นำเข้า org.springframework.batch.core.stepexecutionListener; นำเข้า org.springframework.batch.item.itemwriter; itemwriter <person>, itemwriteListener <person>, stepexecutionListener {ส่วนตัว espersonrepository personrepository; Public ElasticSearchItemWriter (espersonrepository personrepository) {this.personrepository = personrepository; } @Override โมฆะสาธารณะ beforewrite (รายการ <? ขยายบุคคล> รายการ) {} @Override โมฆะสาธารณะ Afterwrite (รายการ <? ขยายบุคคล> รายการ) {} @Override โมฆะสาธารณะ onWriteERror (ข้อยกเว้นยกเว้นรายการ <? AfTERSTEP (StepExecution StepExecution) {return null; } @Override โมฆะสาธารณะเขียน (รายการ <? ขยายบุคคล> รายการ) โยนข้อยกเว้น {// วิธีการ saveAll ที่ใช้คลาส AbstractElasticSearchRepository เรียก elasticsearchoperations.bulkindex (แบบสอบถาม) ซึ่งเป็นดัชนีชุดของบุคคล -2.4 การกำหนดค่า elasticsearchitemReader (ตัวอย่างนี้ไม่ได้ใช้สำหรับการอ้างอิงเท่านั้น)
แพ็คเกจ com.hfcsbc.esetl.itemreader; นำเข้า org.springframework.batch.item.data.abstractpaginateddataitemreader; นำเข้า org.springframework.beans.lastorive org.springframework.data.elasticsearch.core.query.searchQuery; นำเข้า java.util.iterator;/** * สร้างโดย Pengchao เมื่อปี 2018/2/24 */ชั้นเรียนสาธารณะ ElasticSearchTemreader <person> ขยายบทสรุป Query Query Private Final; ชั้นเรียนสุดท้ายส่วนตัว <? ขยายบุคคล> TargetType; Public ElasticSearchItemReader (ElasticSearchOperations ElasticSearchOperations, SearchQuery Query, Class <? ขยายบุคคล> TargetType) {this.ElasticSearchOperations = ElasticSearchOperations; this.query = Query; this.targetType = TargetType; } @Override ป้องกันตัววนซ้ำ <person> dopageread () {return (iterator <person>) elasticsearchoperations.queryforlist (Query, TargetType) .Iterator (); } @Override โมฆะสาธารณะ AfterPropertIesset () โยนข้อยกเว้น {}}2.5 การกำหนดค่าที่จำเป็นสำหรับการกำหนดค่าสปริงแบทช์
แพ็คเกจ com.hfcsbc.esetl.config; นำเข้า com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter; นำเข้า com.hfcsbc.esetl.repository.es.espersonrepository; นำเข้า com.hfcsbc.esetl.domain.person org.springframework.batch.core.step; นำเข้า org.springframework.batch.core.configuration.annotation.enablebatchprocessing; นำเข้า org.springframework.batch.core.configuration.annotation.jobbuilderfactory; org.springframework.batch.core.configuration.annotation.stepbuilderfactory นำเข้า org.springframework.batch.core.launch.support.runidincrementer; org.springframework.batch.core.repository.support.jobrepositoryfactorybean; นำเข้า org.springframework.batch.item.itemreader; นำเข้า org.springframework.batch.iteMwriter; org.springframework.batch.item.database.orm.jpanativequeryprovider นำเข้า org.springframework.beans.factory.annotation.autowired; นำเข้า org.springframework.context.annotation.bean; org.springframework.transaction.platformTransactionManager นำเข้า Javax.persistence.entityManagerFactory; นำเข้า Javax.sql.datasource;/** * สร้างโดย Pengchao ในปี 2018/2/23 */@การกำหนดค่า @Bean Public ItemReader <Person> orderItemReader (EntityManagerFactory EntityManagerFactory) {JPapagingItemReader <Person> reader = ใหม่ jPapagingItemReader <person> (); String sqlQuery = "เลือก * จากบุคคล"; ลอง {jpanativeQueryProvider <Person> queryProvider = new JPanativeQueryProvider <person> (); queryprovider.setsqlQuery (sqlQuery); queryProvider.setEntityClass (person.class); queryprovider.afterpropertiesset (); Reader.SetEntityManagerFactory (EntityManagerFactory); reader.setPagesize (10,000); Reader.SetQueryProvider (QueryProvider); Reader.AfterPropertIesset (); reader.setSavestate (จริง); } catch (exception e) {e.printstacktrace (); } return reader; } @bean สาธารณะ elasticsearchitemWriter itemWriter () {ส่งคืน eLASTICSECHITEMWRITER ใหม่ (PersonRepository); } @Bean ขั้นตอนสาธารณะขั้นตอน (stepbuilderFactory stepbuilderFactory, itemReader itemReader, itemWriter itemWriter) {ส่งคืน stepbuilderFactory .get ("step1") .chunk (10,000) .reader (itemreader) .writer } @Bean งานสาธารณะงาน (JobBuilderFactory JobBuilderFactory, ขั้นตอนขั้นตอน) {ส่งคืน JobBuilderFactory .get ("importJob") .incrementer (ใหม่ runidincrementer ()) .flow (ขั้นตอน) .end () .build (); } /*** เมื่อมีการดำเนินการแบทช์สปริงบางตารางที่ต้องการจะถูกสร้างขึ้น นี่คือที่ตั้งสำหรับการสร้างตาราง: DataSource * @param DataSource * @param Manager * @return */ @bean งานสาธารณะงานสาธารณะงาน (DataSource DataSource, PlatformTransactionManager) JobRepositoryFactoryBean.setDataSource (DataSource); JobRepositoryFactoryBean.SetTransactionManager (ผู้จัดการ); JobRepositoryFactoryBean.setDatabasetype ("Postgres"); ลอง {ส่งคืน JobRepositoryFactoryBean.getObject (); } catch (exception e) {e.printstacktrace (); } return null; -2.6 กำหนดค่าที่อยู่การเชื่อมต่อของฐานข้อมูลและ ES
ฤดูใบไม้ผลิ: Redis: โฮสต์: 192.168.1.222 ข้อมูล: Jest: URI: http://192.168.1.222:9200 ชื่อผู้ใช้: รหัสผ่านยืดหยุ่น: Changeme JPA: ฐานข้อมูล: PostgreSql show-sql: true hibernate: ddl-auto JDBC: PostgreSQL: //192.168.1.222: 5433/บุคคลชื่อผู้ใช้: รหัสผ่าน HFCB: HFCB Driver-class-name: org.postgresql.driver max-active: 2spring.batch.initialize-schema เสมอ
2.7 กำหนดค่าคลาสรายการ
แพ็คเกจ com.hfcsbc.esetl; นำเข้า org.springframework.boot.springapplication; นำเข้า org.springframework.boot.autoconfigure.springbootapplication; นำเข้า org.springframework.boot.autoconfigure.data.data org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration; นำเข้า org.springframework.data.elasticsearch.repository.config.enableelasticreearchrepositories; org.springframework.data.jpa.repository.config.enablejparepositories; @springbootapplication (exclude = {elasticsearchautoconfiguration.class "com.hfcsbc.esetl.repository")@enablejparepositories (basepackages = "com.hfcsbc.esetl.repository.jpa") คลาสสาธารณะ esetlapplication -ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น