1. 소개
시스템에 데이터베이스에서 가져와야하는 많은 양의 데이터가 있으면 스프링 배치를 사용하면 가져 오기 효율이 향상 될 수 있습니다. Spring Batch는 ItemReader를 사용하여 데이터 페이지를 읽고 ItemWriter는 데이터를 배치로 작성합니다. Spring Batch는 Elastisearch의 ItemWriter 및 ItemReader를 제공하지 않기 때문에이 예에서는 Custom ElasticSearchItemWriter (ElasticsearchitemWriter)가 배치 가져 오기에 사용됩니다.
2. 예
2.1 pom.xml
이 기사는 Spring Data Jest를 사용하여 ES를 연결합니다 (Spring Data Elasticsearch를 사용하여 ES를 연결할 수 있음). ES 버전은 5.5.3입니다.
<? xml version = "1.0"encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0"xmlns : xsi = "http://www.w3.org/2001/xmlschema-instance" xsi : schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.xsd"> <modeversion> 4.0.0 </modelversion> <groupid> com.hfcsbc.estl </groupid> <Artifactid> es-etl </artifactid> <bersion> 0.0.1-snapshot </version> <packaging> jar </packaging> <name> es-etl </name> <description> 스프링 부츠를위한 es-etl </name> <description> demo project </descrip> <부모> <groupId> org.springframwork.boot </artifactid> spring-starter-starter-starter-starter-starater-starater-starater-starater-sporent> <bersion> 2.0.0.m7 </version> <realiveativePath/> <!-저장소에서 부모를 조회합니다-> </parent> <properties> <project.build.build.sourceencoding> utf-8 </project.build.sourceoding> <project.reporting.outputencoding> utf-8 </propod.reporting.reporporting.outpecoding. <java.version> 1.8 </java.version> </properties> <pectionements> <pectionement> <groupId> org.springframework.boot </groupid> <artifactid> spring-boot-starter </arevactid> </dependency> <groupid> <groupid> org.springframework.boot> <artifactid> Spring-Boot-Starter-Data-JPA </artifactid> </fectionency> <pectionement> <groupId> org.postgresql </groupId> <FortifactID> postgreSQL </arevEctID> </fectionency> <groupId> <groupId> org.springframewort.boot> <artifactid> Spring-Boot-Starter-Batch </artifactid> </fectionency> <pectient> <groupid> com.github.vanroy </groupid> <artifactid> spring-boot-starter-data-jest </artifactid> <version> 3.0.0.release </version> <Groupled> io.searchbox <artifactid> jest </artifactid> <bersion> 5.3.2 </version> </dependency> <prectionement> <groupid> org.projectlombok </groupid> <tritifactid> lombok </artifactid> </fectionency> <pelection> <groupid> org.springframewort <Artifactid> Spring-Boot-Starter-Test </artifactid> <scope> test </scope> </spectency> </fectionements> <build> <plugins> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-maven-plugin </plugin> </plugin> </build> </build> <repository> <id> Spring-SnapShots </id> <name> 스프링 스냅 샷 </name> <url> https://repo.spring.io/snapshot </url> <snapshots> <snapshots> <snapshots> <enabled> true </enabled> </snapshots> </repository> <id> spring milestones </id> </id> <Url> https://repo.spring.io/milestone </url> <snapshots> <enabled> false </enabled> </snapshots> </repository> </repository> </repository> </repository> </wpling-snap </id> <sphiphots </id> </id> <Url> https://repo.spring.io/snapshot </url> <snapshots> <enabled> true </enabled> </snapshots> </pluginRepository> <FluginRepository> <id> Spring-Milestones </id> <name> 스프링 마일즈네스 </name> <Url> https://repo.spring.io/milestone </url> <snapshots> <enabled> false </enabled> </snapshots> </pluginRepository> </pluginRepository> </pluginRepository> </pluginRepositories> </project>
2.2 엔티티 클래스 및 저장소
패키지 com.hfcsbc.esetl.domain; import lombok.data; import org.springframework.data.elasticsearch.annotations.docrument; import org.springframework.data.elasticsearch.annotations.field; import org.springframework.data.elasticsearchs.annotations.NANTONTIFING; org.springframework.data.elasticsearch.annotations.fieldtype; import javax.persistence.entity; import javax.persistence.id; import javax.persistence.onetoone;/** * pengchao에 의해 생성 (indexname = ", shard = 1, shards =, shards = refreshinterval = "-1")@entity@datapublic 클래스 사람 {@id private long id; 개인 문자열 이름; @onetoone @field (type = fieldtype.nested) 개인 주소;} package com.hfcsbc.esetl.domain; import lombok.data; import javax.persistence.entity; import javax.persistence.id;/** * pengchao by 2018/2/23 */@entity@datapublic 클래스 주소 {@id private long id; 개인 문자열 이름;}package com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.person; import org.springframework.data.jpa.repository.jparepository;/** * pengchao에 의해 생성 jpareposority jpareposority <, perfoposority <, perfoposority.
package com.hfcsbc.esetl.repository.es; import com.hfcsbc.esetl.domain.person; import org.springframework.data.elasticsearch.repository.elasticsearchRepository;/** * Pengchao에 의해 생성 된 2018/2/23 */public interforitation <cerstace epserson extens extens extens extens extensecepority에 의해 생성됩니다.
2.3 Elasticsearchitemwriter 구성
package com.hfcsbc.esetl.itemwriter; import com.hfcsbc.esetl.repository.es.espersonrepository; import com.hfcsbc.esetl.domain.person; import org.sprameframework.batch.core.exitstatus; import org.springfringwork.batch.core org.springframework.batch.core.stepexecution; import org.springframework.batch.core.stepexecutionlistener; import org.spramework.batch.item.itemwriter; import java.util.list;/** * pendchao의 생성 */23/23/2/2/2/2/2/2/2/2/2/2/23/23/23/23/23/2/23/23/23/23/23/23/23/23/23/2/ ItemWriter <person>, ItemWritElistener <person>, StepExecutionListener {private espersonrepository personrepository; public elasticsearchitemwriter (EspersonRepository Personrepository) {this.personrepository = personrepository; } @override public void void (list <? extends person> 항목) {} @} @override public void afterwrite (list <? extends person> 항목) {} @override public void onwriteerror (예외 예외, 목록 <? extends perval void beforestep) {} @override problic) {} @override 후방 (StepExecution StepExecution) {return null; } @override public void write (list <? extrends person> expert) 예외 {// class arbospticsearchecepherations.bulkindex (queries)를 호출하는 class abstractelasticsearchRepository를 구현하는 saveall 메소드는 Personreepository.saveall (항목); }}2.4 ElasticsearchitemReader 구성 (이 예제는 참조 용으로 만 사용되지 않음)
pack org.springframework.data.elasticsearch.core.query.query.searchquery; import java.util.iterator;/** * pengchao by Pengchao 2018/2/24 */public class elasticsearchitemreader <person> extends AbstractPaginatedDateMreader <person> expecial ernestice -bean elasticserations elasticseRations elasticseRations em; 개인 최종 SearchQuery 쿼리; 개인 최종 클래스 <? 개인> TargetType를 확장합니다. public elasticsearchitemreader (elasticsearchoperations elasticsearchoperations, searchQuery query, class <? extrends person> targettype) {this.elasticsearchoperations = elasticsearchoperations; this.query = 쿼리; this.targetType = TargetType; } @override protected iterator <person> dopageRead () {return (iterator <person>) elasticsearchOperations.queryforList (query, targettype) .iterator (); } @override public void approperTiesset () throws exception {}}2.5 스프링 배치 구성에 필요한 구성
package com.hfcsbc.esetl.config; import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter; import com.hfcsbc.esetl.repository.es.espersonrepository; import com.hfcsbc.esetl.domain.person; import org.spramwork.batch.core org.springframework.batch.core.step; import org.springframework.batch.core.configuration.annotation.enablebatchprocessing; import org.springframework.batch.core.configuration.annotation.jobbuilderfactory; 가져 오기 org.springframework.batch.core.configuration.annotation.stepbuilderfactory; import org.springframework.batch.core.launch.support.runidincrementer; import org.spramframework.batch.core.repository.jobrepository; import org.springframework.batch.core.repository.support.jobrepositoryfactorybean; import org.springframework.item.item.item.item.item.item.item.item.item.batch.batch.item.itemwriter; import org.springframework.itabase.jpatemeatemeatere; org.springframework.batch.item.database.orm.jpanativeQueryProvider; import org.springframework.bean.beans.annotation.autowired; import org.springframework.context.annot.bean; import org.springframework.context.annotation.configigation; org.springframework.transaction.platformtransactionmanager; import javax.persistence.entitymanagerfactory; import javax.sql.datasource;/** * pengchao by pengchao by 2018/2/23 */@configuration@enablebatch problic classconfig {@autoireced private woritation reponoireate private weperient reponoireation repuberitation queporitation batchconfig {@autireous previty woritation@a eNablebatch public classconfig; @Bean public itemreader <persiteTemReader (entityManagerFactory entityManagerFactory) {jPapingItEmreader <person> reader = new JPapingItemReader <Person> (); 문자열 sqlquery = "선택 *에서 person"; {jpanativeQueryProvider <Person> QueryProvider = New JPanativeQueryProvider <person> (); QueryProvider.setsqlQuery (sqlQuery); QueryProvider.setentityClass (person.class); QueryProvider.afterProperTiesset (); reader.setentityManagerFactory (entityManagerFactory); reader.setpagesize (10000); reader.setQueryProvider (QueryProvider); reader.fterpropertiesset (); reader.setsavestate (true); } catch (예외 e) {e.printstacktrace (); } 리턴 리더; } @bean public elasticsearchitemwriter itemwriter () {return new elasticsearchitemwriter (personrepository); } @bean public step step (stepBuilDerfactory stepBuilDerfactory, itemBuilDerFactory, itemReader itemWriter) {return stepBuilDerfactory .get ( "step1") .chunk (10000) .Reader (itemReader) .Writer (itemWriter) .Build (); } @Bean Public Job Job (jobBuilDerfactory jobBuilDerfactory, step step) {return jobBuilDerfactory .get ( "importJob") .incrementer (new RunidIncrementer ()) .Flow (step) .end () .Build (); } /*** 스프링 배치가 실행되면 필요한 일부 테이블이 만들어집니다. 다음은 테이블 작성 위치입니다. DataSource * @param dataSource * @param manager * @return */ @bean public jobRepository jobRepository (DataSource DataSource, PlatformTransactionManager Manager) {jobRepositoryBean jobRepository ActoryBean = new jobRepository factoryBean (); jobRepositoryFactoryBean.SetDatasource (DataSource); jobRepositoryFactoryBean.SetTransactionManager (관리자); jobRepositoryFactoryBean.SetDatabasEtype ( "Postgres"); {return jobRepositoryFactoryBean.getObject (); } catch (예외 e) {e.printstacktrace (); } return null; }}2.6 데이터베이스 및 ES의 연결 주소 구성
스프링 : 호스트 : 192.168.1.222 데이터 : JEST : URI : http://192.168.1.22222200 사용자 이름 : 탄성 비밀번호 : Changeme JPA : 데이터베이스 : Postgresql Show-SQL : True Hibernate : DDL-Auto : Update DataSource : Postgresl : Postgresl : Postgresl : jdbc : postgresql : //192.168.1.222 : 5433/Person Username : HFCB 비밀번호 : hfcb driver-class-name : org.postgresql.driver max-active : 2spring.batch.initialize-schema : 항상
2.7 항목 클래스를 구성합니다
패키지 com.hfcsbc.esetl; import org.springframework.springApplication; import org.springframework.boot.autoconfigure.springbootApplication; import org.springframework.boot.autoconfigure.data.elasticsearcoconoconfigeration; org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration; import org.sprameframework.data.elasticsearch.repository.config.enableelasticsearchrepositories; import org.springframework.data.jpa.repository.config.enablejparepositories; @springbootApplication (exclude = {elasticsearchautoconfiguration.class, elasticsearchDataOutoConfiguration.class})@enableAsticsearchRepositories (vasepackages = "com.hfcsbc.esetl.repository")@enablejparepositories (basepackages = "com.hfcsbc.esetl.repository.jpa") public class esetlapplication {public static void main (string [] args) {springApplication.run (esetlapplascation, args); }}위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.