1。はじめに
システムにデータベースからインポートする必要がある大量のデータがある場合、スプリングバッチを使用すると、インポートの効率を向上させることができます。 Spring BatchはItemReaderを使用してデータページを読み取り、ItemWriterはバッチにデータを書きます。 Spring BatchはElastearchのItemWriterおよびItemReaderを提供していないため、この例では、カスタムElasticsearchItemWriter(ElasticSearchItemReader)がバッチインポートに使用されます。
2。例
2.1 pom.xml
この記事では、Spring Data Jestを使用してESを接続します(Spring Data Elasticsearchを使用してESを接続することもできます)、ESバージョンは5.5.3です
<?xml version = "1.0" encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns:xsi = "http://www.w3.org/2001/xmlschema-instance <http://www.w3.org/2001 xsi:schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.0.sdsd"> <modeleversion> 4.0.0 </modelversion> <グループ> <artifactid> es-etl </artifactid> <バージョン> 0.0.1-snapshot </version> <packaging> jar </packaging> <name> es-etl </name> <説明> spring boot </description> <parent> <parent> org.springframework.boot < <バージョン> 2.0.0.m7 </version> <relativepath/> <! - リポジトリからのlookup parent-> </parent> <properties> <propert.build.sourceencoding> utf-8 </project.build.sourceencoding> <project.reporting.outputencoding> <java.version> 1.8 </java.version> </properties> <dependencies> <dependency> groupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter </artifactid> </dependency> <dependency> <gripwork.springframework </groupid> <artifactid> spring-boot-starter-data-jpa </artifactid> </dependency> <dependency> groupid> org.postgresql </groupid> <artifactid> postgreSql </artifactid> </dependency> <redence> <dependency> org.springframework.boot </groupid> <artifactid> spring-boot-starter-batch </artifactid> </dependency> <dependency> com.github.vanroy </groupid> <artifactid> spring-boot-starter-data-jest </artifactid> <バージョン> 3.0.0.release < <artifactid> jest </artifactid> <bersion> 5.3.2 </version> </dependency> <dependency> <groupid> org.projectlombok </groupid> <artifactid> lombok </artifactid> </dependency> <依存関係> <依存関係> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter-test </artifactid> <scope>テスト</scope> </dependency> </dependencies> <bultins> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-maven-plugin </artifactid> </plugins> <Repository> <id> spring-snapshots </id> <name> spring snapshots </name> <url> https://repo.spring.io/snapshot.io/snapshots> <Enabled> true </enabled> </snapshots> </sprunt> <repository> <id <id <> <> <> <> <> <repository> <id> <id> <id> <id> <> <reposion <url> https://repo.spring.io/milestone </url> <snapshots> <enabled> false </enabled> </snapshots> </repository> </repository> </repository> </repository> </pluginRepository>プラグインポジトリー> spring-snapshots>スナップショット</name> <url> https://repo.spring.io/snapshot </url> <SnapShots> <Enabled> true </enabled> </snapshots> </pluginRepository> <pluginrepository> <id> spring-milestones </id> spring milestone </name </name> <url> https://repo.spring.io/milestone </url> <snapshots> <enabled> false </enabled> </snapshots> </pluginrepository> </pluginrepository> </pluginrepository> </pluginrepository> </pluginepositories> </project>
2.2エンティティクラスとリポジトリ
パッケージcom.hfcsbc.esetl.domain; import lombok.data; import org.springframework.data.elasticsearch.annotations.document; import org.springframework.data.elasticsearch.annotations.field; Import org.springframework.data.elasticsearch org.springframework.data.elasticsearch.annotations.fieldtype; import javax.persistence.entity; import javax.persistence.id; import javax.persistence.onetoone;/** * javax.persistence.onetoone; lebher interval = "-1")@entity@datapublic class person {@id private long id;プライベート文字列名; @onetoone @field(type = fieldtype.nested)プライベートアドレスアドレス;}パッケージcom.hfcsbc.esetl.domain; import lombok.data; import javax.persistence.entity; import javax.persistence.id;/** * create by pengchao by pengchao by pengchao by@@entity@datapublic classアドレス{@id private long id;プライベート文字列名;}パッケージcom.hfcsbc.esetl.repository.jpa;インポートcom.hfcsbc.esetl.domain.person; Import org.springframework.data.jpa.repository.jparepository;
パッケージcom.hfcsbc.esetl.repository.es;インポートcom.hfcsbc.esetl.domain.person; import org.springframework.data.elasticsearch.repository.elasticsearchrepository; long> {}2.3 ElasticSearchItemWriterの構成
パッケージcom.hfcsbc.esetl.itemwriter; Import com.hfcsbc.esetl.repository.es.espersonrepository; Import com.hfcsbc.esetl.domain.person; Import org.springframework.batch.batch.core.exittatus; org.springframework.core.stepexecution; Import org.springframework.batch.core.stepexecutionlistener; Import org.springframework.item.itemwriter; Import java.util.list;/** * Pengchao by Pengchao on 2018/2/2/23 */public eLastefedの作成itemwriter <serpures>、itemwriteListener <surne>、stepexecutionlistener {private espersonrepository personrepository; public elasticsearchItemwriter(espersonrepository personrepository){this.personRepository = personrepository; } @Override public void beforewrite(list <?extends person>アイテム){} @override public void afterwrite(list <?extends person> items){} @override public void onwriteerror(例外、リスト<?extends person>アイテム){} @Override public voidepedep(depexecution exexecution extecution extecution afterstep(stepexecution stepexecution){return null; } @Override public void write(list <?extends persons> items)スロー例外{//クラスAbstractelasticsearchRepositoryを実装するSaveallメソッドは、erasticsearchoperations.bulkindex(queries)を呼び出します。 }}2.4 ElasticSearchItemReaderの構成(この例は使用されていません、参照のみ)
package com.hfcsbc.esetl.itemReader;import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;import org.springframework.beans.factory.InitializingBean;import org.springframework.data.elasticsearch.core.ElasticsearchOperations;import org.springframework.data.elasticsearch.core.query.searchquery; Import java.util.iterator;/** * create by pengchao by pengchao by pengchao by public class elasticsearchItemreader <パーソン>プライベート最終検索クエリクエリ。プライベートファイナルクラス<?人>ターゲットタイプを拡張します。 public ElasticsearchItemReader(ElasticSearchoperations Elasticsearchoperations、SearchQuery Query、class <?extends person> targetType){this.ElasticsearchOperations = ElasticSearchOperations; this.query = query; this.targetType = targetType; } @Override Protected Iterator <Person> dopagERead(){return(iterator <person>)elasticsearchoperations.queryforlist(query、targettype).iterator(); } @override public void avherpropertiesset()スロー例外{}}2.5スプリングバッチの構成に必要な構成
パッケージcom.hfcsbc.esetl.config;インポートcom.hfcsbc.esetl.itemwriter.elasticsearchitemwriter; Import com.hfcsbc.esetl.repository.es.espersonRepository;インポートcom.hfcsbc.esetl.domain.person; org.springframework.core.step; Import org.springframework.batch.core.configuration.annotation.enableBatchProcessing; Import org.springframework.batch.core.configuration.annotation.jobbuilderfactory; Import; org.springframework.batch.core.configuration.annotation.stepbuilderfactory; Import org.springframework.batch.core.launch.support.runidincrementer; Import org.springframework.batch.batch.core.repository.jobrepository; Import; org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.JpaPagingItemReader;import springframework.item.database.orm.orm.jpanativequeryprovider; import org.springframework.beans.actory.annotation.autowired; import org.springframework.context.annotation.bean; Import org.springframework.context.context.context.configuature org.springframework.transaction.platformtransactionmanager; import javax.persist.entitymanagerfactory; import javax.sql.datasource;/** * create by pengchao by pengchao by pengchao by configuration PersonRepository; @Bean public ItemReader <Person> OrderItemReader(EntityManagerFactory EntityManagerFactory){jpapagingItemReader <Person> reader = new jpapagingItemreader <serson>();文字列sqlquery = "select * from person"; try {jPanativeQueryProvider <Person> QueryProvider = new JPanativeQueryProvider <Person>(); queryprovider.setsqlquery(sqlquery); queryProvider.SetEntityClass(Person.Class); queryprovider.afterpropertiesset(); Reader.SetEntityManagerFactory(EntityManagerFactory); reader.setPagesize(10000); reader.setQueryProvider(QueryProvider); reader.afterpropertiesset(); reader.setsavestate(true); } catch(Exception e){e.printstacktrace(); } return reader; } @bean public elasticsearchItemwriter itemwriter(){return new ElasticSearchItemWriter(PersonRepository); } @bean public step(stepbuilderfactory stepbuilderfactory、item reader itemreader、itemwriter itemwriter){return stepbuilderfactory .get .get( "step1").chunk(10000).reader(itemwriter).writer(itemwriter).build(); } @bean public job(jobbuilderfactory jobbuilderfactory、step step){return jobbuilderfactory .get( "importjob").incrementer(new runidincrementer()).flow(step).end().build().build(); } /***スプリングバッチが実行されると、必要なテーブルが作成されます。テーブル作成の場所は次のとおりです。DataSource * @Param DataSource * @Param Manager * @return */ @Bean Public JobrePository JobrePository(DataSource DataSource、PlatformTransactionManager Manager){JoblepositoryFactorybean jobrepositoryfactorybean = new JobrepositoryFactorybean(); jobrepositoryfactorybean.setdatasource(dataSource); jobrepositoryfactorybean.setTransactionManager(マネージャー); jobrepositoryfactorybean.setdatabasetype( "postgres"); try {return jobrepositoryfactorybean.getobject(); } catch(Exception e){e.printstacktrace(); } nullを返します。 }}2.6データベースとESの接続アドレスを構成する
春:Redis:HOST:192.168.1.222データ:JEST:URI:http://192.168.1.222:9200ユーザー名:弾性パスワード:Changeme JPA:データベース:Postgresql Show-SQL:True Hibernate:DDL-Auto:DataSource:Platform:Platform:Platform:Platious:Platform auto:upplation JDBC:postgreSql://192.168.1.222:5433/人のユーザー名:HFCBパスワード:HFCB Driver-Class-Name:org.postgresql.driver max-active:2spring.batch.initialize-schema:常に
2.7エントリクラスを構成します
package com.hfcsbc.esetl;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration; Import org.springframework.data.elasticsearch.repository.config.enableLasticsearchRepositories; Import springframework.data.jpa.repository.config.enablejparepositories; @springbootapplication(explude = {elasticsearchautoconfiguration.class、erasticsearchdataautoconfiguration.class}) "com.hfcsbc.esetl.repository")@enablejparepositories(basepackages = "com.hfcsbc.esetl.repository.jpa")public static void main(string [] args){springpliplication.run(esetlapplication.ragss、class、class、class); }}上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。