1. Pendahuluan
Ketika sistem memiliki sejumlah besar data yang perlu diimpor dari database, menggunakan batch pegas dapat meningkatkan efisiensi impor. Spring Batch menggunakan itemReader untuk membaca halaman data, dan ItemWriter menulis data dalam batch. Karena Batch Musim Semi tidak menyediakan ItemWriter dan ItemReader Elastisearch, dalam contoh ini, ElasticsearchitemWriter khusus (ElasticsearchitemReader) digunakan untuk impor batch.
2. Contoh
2.1 pom.xml
Artikel ini menggunakan Spring Data Jest untuk menghubungkan ES (Anda juga dapat menggunakan Spring Data Elasticsearch untuk menghubungkan ES), dan versi ES adalah 5.5.3
<? Xml Version = "1.0" encoding = "UTF-8"?> <Project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema XSI: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <ModelVersion> 4.0.0 </ModelVersion> <groupid> com.hfcsbcbcbc. <ArtifactId> es-ETL </RiTtifacTID> <Version> 0.0.1-Snapshot </version> <packaging> Jar </packaging> <name> es-ETL </name> <cripence> Proyek Demo untuk Spring Boot </creckripe> <tuxter> org.springframework. <Version> 2.0.0.m7 </version> <relativePath/> <!-Pencarian orangtua dari repositori-> </tarent> <properties> <poject.build.sourceencoding> UTF-8 </project.build.sourceencoding> <poject.reporting.outputencoding> UTF-8PETOR.SOUPORT. <Java.Version> 1.8 </java.version> </ Properties> <dependencies> <dependency> <GroupId> org.springframework.boot </groupid> <ArtifactId> Spring-boot-starter </artifactid> </groupdency> <dependency> <groupid> org.springfr.springfr. <ArtifactId> Spring-boot-starter-data-jpa </artifactid> </gandendency> <dependency> <groupid> org.postgresql </proupid> <RtifactId> PostgreSQL </artifactid> </groupid> <dependency> <groupid> org.springfring.boot <ArtifactId> Spring-boot-starter-batch </arttifactid> </ganden> <dependency> <groupid> com.github.vanroy </sroupid> <ArtifactId> Spring-boot-starter-data- Jest </arttifactid> <version> 3.0.0.relove </Version> </ArtifactId> <version> 3.0.0.relove </Versi> </DEPENDING </ARTIFACTID> <ArTifactId> Jest </RiTtifacTID> <Version> 5.3.2 </version> </gandendency> <dependency> <groupid> org.projectlombok </proupid> <RartifactId> lombok </artifactid> </dependency> <groupid> org.springframework.boot </Dependency> <groupid> <groupid> org.springframework.boot </Dependency> <groupid> <ArtifactId> Spring-boot-starter-test </arttifactid> <scope> test </seupope> </dependency> </dependencies> <build> <dlugin> <groupid> org.springframework.bootor </groupid> <Artifactid> </art/plugin </art/orgin-boots-maven-plugin </artifacit> </Artifactid </art/artfactid </art/artfactid </art <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repository> </repository> </repository> </pluginRepository> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <rrepo.spring.io/snapshot </ruRl> <snapshots> <denabled> true </inableed> </snapshots> </pluginrepository> <luginrepository> <drange> spring-milestones </id name> Spring Milestones </name> <rrepo.spring.io/milestone </ruRl> <snapshots> <denabled> false </enabled> </snapshots> </pluginrepository> </pluginrepository> <it pluginrepository> </pluginrepository> </pluginrepository>
2.2 Kelas Entitas dan Repositori
Paket com.hfcsbc.esetl.domain; impor lombok.data; impor org.springframework.data.elasticsearch.annotations.document; impor org.springframework.data.elasticsearch.annotations.field; org.springframework.data.elasticsearch org.springframework.data.elasticsearch.annotations.FieldType;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.OneToOne;/** * Create by pengchao on 2018/2/23 */@Document(indexName = "person", type = "person", shards = 1, replicas = 0, RefreshInval = "-1")@entitas@datapublic class Person {@id id private long; nama string pribadi; @Onetoone @field (type = fieldtype.nested) alamat alamat pribadi;} Paket com.hfcsbc.esetl.domain; import lombok.data; import javax.persistence.entity; import javax.persistence.id;/** * Buat dengan Pengchao pada 2018/2/23 */@entity@data kelas DataPublic {@id Private ID Long;; nama string pribadi;}Paket com.hfcsbc.esetl.repository.jpa; impor com.hfcsbc.esetl.domain.person; impor org.springframework.data.jpa.repository.jparepository;/** Buat dengan Pengchao pada 2018/2/2/2/23 */**
Paket com.hfcsbc.esetl.repository.es; impor com.hfcsbc.esetl.domain.person; impor org.springframework.data.elasticsearch.Repository.elasticsearchrepository;/** Buat oleh Pengchao pada 20188/2/2/2/23 Long> {}2.3 Mengkonfigurasi ElasticsearchItemWriter
Paket com.hfcsbc.esetl.itemwriter; impor com.hfcsbc.esetl.repository.es.espersonRepository; impor com.hfcsbc.esetl.domain.persen; impor org.spramework.batch.core.exitstatus; Imporerener; org.springframework.batch.core.stepexecution; impor org.springframework.batch.core.stepexecutionlistener; impor org.springframework.batch.item.Itemwriter; impor java.util.list;/** * Buat dengan pengchao on 2018 ItemWriter <Fon>, itemWriteListener <Fone>, StepExecutionListener {private EspersonRepository PersonRepository; Public ElasticsearchItemWriter (EspersonRepository PersonRepository) {this.personRepository = personRepository; } @Override public void beForewrite (daftar <? Extends Person> item) {} @Override public void afterwrite (daftar <? Item orang> {}} public void public OnwriteError (pengecualian Excection, Leget -items) {} @Override public void BreeGoDEXIDE PUBLOPEP (LANGKAH EXTENDS) {} @Override Void Void BreeG forforested BEGERUPIDEPEP (LOPEPEP) {} @Override void void BEGERI PUBLFORESTOPE PUBLFORDOUTE BEGERIVODE PUBLIKOPEP) Afterstep (StepExecution StepExecution) {return null; } @Override public void write (Daftar <? Extends Person> Items) melempar Exception {// Metode SaveAll yang mengimplementasikan kelas AbstractLasticsearchRepository memanggil elasticsearchoperations.bulkindex (Queries), yang merupakan indeks batch dari personrepository.saveall (item); }}2.4 Mengkonfigurasi ElasticsearchitemReader (contoh ini tidak digunakan, hanya untuk referensi)
Paket com.hfcsbc.esetl.itemreader; impor org.springframework.batch.item.data.abstractpaginatedDataitemreader; impor org.springframework.beans.factory.initializeBean; impor org.springframework.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data.data. org.springframework.data.elasticsearch.core.query.SearchQuery;import java.util.Iterator;/** * Create by pengchao on 2018/2/24 */public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean { private final ElasticsearchOperations elasticsearchOperations; kueri pencarian akhir pribadi; Kelas Akhir Pribadi <? memperluas orang> targetType; Publik ElasticsearchitemReader (Elasticsearchoperations Elasticsearchoperations, Query SearchQuery, kelas <? Extends Person> TargetType) {this.elasticsearchoperations = Elasticsearchoperations; this.query = kueri; this.targetType = targetType; } @Override terlindungi iterator <sone> dopageread () {return (iterator <sone>) elasticsearchoperations.QueryForList (kueri, targetType) .iterator (); } @Override public void afterpropertiesset () melempar Exception {}}2.5 Konfigurasi Diperlukan untuk Mengkonfigurasi Batch Musim Semi
Paket com.hfcsbc.esetl.config; impor com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter; impor com.hfcsbc.esetl.repository.es.esponrepository; impor com.hfcsbc.esetl.domain.person; org.springframework.batch.core.step; impor org.springframework.batch.core.configuration.annotation.enableBatchProcessing; impor org.springframework.batch.core.configuration.annotation.jobbuilderfactory; impor org.springframework.batch.core.configuration.annotation.stepbuilderfactory; impor org.springframework.batch.core.launch.support.runidincrementer; impor org.spramework.batch.core.repository.jrepository.jeposorory org.springframework.batch.core.repository.support.jobrepositoryfactorybean; impor org.springframework.batch.item.itemreader; impor org.spramework.batch.itemwriter; impor org.spramework.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch.batch. yang ” org.springframework.batch.item.database.orm.jpanativeQueryprovider; impor org.springframework.beans.factory.annotation.Autowired; impor org.spramework.context.annotation.bean; impor org.spramramework.context.notation. org.springframework. @Bean Public ItemReader <Fon> orderItemReader (EntityManagerFactory EntityManagerFactory) {jpapagingItemReader <Fon> pembaca = jpapagingItemReader baru <Fon> (); String sqlQuery = "SELECT * from Person"; coba {jPanativeQueryProvider <Fon> queryProvider = new jpanativeQueryProvider <Fon> (); queryprovider.setsqlQuery (sqlQuery); queryprovider.setentityclass (person.class); queryprovider.afterpropertiesset (); reader.setentityManagerFactory (EntityManagerFactory); reader.setPagesize (10000); reader.setQueryProvider (QueryProvider); reader.afterpropertiesset (); reader.setsavestate (true); } catch (Exception e) {E.PrintStackTrace (); } return reader; } @Bean Public ElasticsearchItemWriter itemWriter () {return new ElasticsearchIteMwriter (personRepository); } @Bean Langkah Langkah Publik (StepBuilderFactory StepBuilderFactory, ItemReader ItemReader, ItemWriter ItemWriter) {return stepbuilderfactory .get ("Step1") .chunk (10000) .reader (itemreader) .writer (itemWriter) .build (); } @Bean Pekerjaan Pekerjaan Publik (JobBuilderFactory JobBuilderFactory, Step Step) {return jobBuilderFactory .get ("ImportJob") .crementer (runidincrementer baru ()) .flow (langkah) .End () .build (); } /*** Ketika batch pegas dieksekusi, beberapa tabel yang dibutuhkan akan dibuat. Berikut adalah lokasi untuk pembuatan tabel: DataSource * @param DataSource * @param Manager * @return */ @bean Public JobRepository JobRepository (DataSource DataSource, PlatformTransActionManager Manager) {JobRepositoryFactoryBean (PLATCHREPOSITORYFACToryBean = newRepository; JobRepositoryFactoryBean.SetDataSource (DataSource); jobrepositoryfactorybean.setTransactionManager (manajer); jobrepositoryfactorybean.setDatabasetype ("postgres"); coba {return jobrepositoryFactoryBean.getObject (); } catch (Exception e) {E.PrintStackTrace (); } return null; }}2.6 Konfigurasikan alamat koneksi database dan ES
spring: redis: host: 192.168.1.222 data: jest: uri: http://192.168.1.222:9200 username: elastic password: changeme jpa: database: POSTGRESQL show-sql: true hibernate: ddl-auto: update datasource: platform: postgres url: JDBC: PostgreSQL: //192.168.1.222: 5433/Person Nama pengguna: HFCB Kata Sandi: HFCB Driver-Class-Name: org.postgresql.driver Max-Active: 2spring.batch.inialize-SCEMA:
2.7 Konfigurasikan kelas entri
Paket com.hfcsbc.esetl; impor org.springframework.boot.springapplication; impor org.springframework.boot.autoconfigure.springbootApplication; impor org.spramework.boot.Autoconfigure.data. org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchDataAutoconfiguration; impor org.springframework.data.elasticsearch.repository.config.enableelasticsearchrepositories; impor org.springframework.data.jpa.repository.config.enableJparePositories; @springbootApplication (tidak termasuk = {elasticsearchautoconfiguration.classearchrepackacors = classorties}@enableelasticsicticsicticsictices = Enableelasticsictices = elasticsePrePoConFiguration "com.hfcsbc.esetl.repository")@enableJparePositories (BasEpackages = "com.hfcsbc.esetl.repository.jpa") kelas publik Esetlapplication {public static void (string [] args) {springapplication.run (esetlapping (esetlicass. }}Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.