1. Introdução
Quando o sistema possui uma grande quantidade de dados que precisam ser importados do banco de dados, o uso do lote de mola pode melhorar a eficiência da importação. O Spring Batch usa o ItemReader para ler as páginas de dados, e o ItemWriter grava dados em lotes. Como o lote da primavera não fornece o ItemWriter e o itemreader da Elastisearch, neste exemplo, um ElasticsearchItemwriter personalizado (elasticsearchItemReader) é usado para importação em lote.
2. Exemplo
2.1 pom.xml
Este artigo usa o Spring Data Jest para conectar ES (você também pode usar o Spring Data Elasticsearch para conectar ES), e a versão ES é 5.5.3
<? xml versão = "1.0" coding = "utf-8"?> <Projeto xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.ww3.org/2001/xmlschaMance xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/Maven-4.0.0.xsd"> <modelVersion> 4.0.0 </modelversion> <proupid> com.hfcsbcsbccsc. <TRAFACTID> ES-ETL </ArtifactId> <Versão> 0.0.1-Snapshot </siers> <batyaging> jar </acheging> <name> es-etl </name> <cription> Projeto Demo para Spring Boot </Descrição> <Dorfact> <Pringid> org.springBROTWROT.BOOT </Grupo <brount> <versão> 2.0.0.m7 </versão> <RelityPath/> <!-Lookup Parent from Repository-> </parent> <Perts> <Project.build.sourceEncoding> utf-8 </project.build.sourceEncoding> <Projectoring.outputEnging> utf-8 </prond. <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <TarifactId> Spring-Boot-Starter-Data-jpa </starfactId> </pendesenty> <pendence> <puperid> org.postgresql </fuidid> <ArtifactId> postgresql </ArtifactId> </dependência> <pusency> <voupid> org.springframework.Brot.Blate.Brot </Dependency> <pusency> <proupid> org.springframework.Bl. <Tarifactid> Spring-Boot-Starter-Batch </ArtifactId> </Dependency> <pendency> <puperid> com.github.vanroy </frupiD> <TRATIFACTID> Spring-boot-starter-Data-jest </Artifactid> <versão <TarifactId> jest </sutifactId> <versão> 5.3.2 </version> </dependency> <pendence> <puperid> org.projectlombok </fuidiD> <TRARFACTID> LOMBOK </ArtifactId> </dependency> <pendency> <puadeid> org.springfameswork.Boot </groupy> </ArtifactId> Spring-Boot-Starter-Test </storkactid> <cope> Test </schope> </dependency> </dependências> <fruct> <flugins> <plugiD> org.springframework.boot </groupid> <tlugifactid> spring-boot-maven-plugin </artifactId> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url> https://repo.spring.io/milestone </erl> <nsnapShots> <bility> false </ability> </snapshots> </pospositório> </repository> </pospository> </repositor <url> https://repo.spring.io/snapshot </url> <nsnapshots> <bility> true </abilabed> </snapshots> </pluginRepository> <cluginrepository> <di> malha-me-milestones </dod> <name> mola de mola </nome> <url> https://repo.spring.io/milestone </erl> <nsnapshots> <bility> false </ability> </snapshots> </pluginrepositório> </fluginRepository> </fluginRepositório> <//PluginRepository> </PluginRepositories>
2.2 Classe de entidade e repositório
pacote com.hfcsbc.esetl.domain; importar lombok.data; importar org.springframework.data.elticsearch.annotations.document; importar org.springframework.data.elticsearch.annotations.field; importação.springframework.data.elticsearch.annotations.fielm; org.springframework.data.elasticsearch.annotações.fieldType; importar javax.persistence.entity; importar javax.persistence.id; import javax.persistence.onetoOne;/** ** ** ** por pengchao em 2018/2/23 */@indexname; refreshInterval = "-1")@entity@datapublic classe pessoa {@id private longo id; nome de string privado; @Onetoone @field (type = fieldtype.nested) Endereço privado Endereço;} pacote com.hfcsbc.esetl.Domain; importar lombok.data; importar javax.persistence.entity; importar javax.persistence.id;/** * Criar por Pengchao em 2018/2/23 */@entity@Datapublic Endereço {@id Long id; Nome da sequência privada;}pacote com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.person; importar org.springframework.data.jpa.repository.jparepository;/** ** Criar por pengchao em 2018/2/23 *
pacote com.hfcsbc.esetl.repository.es; importar com.hfcsbc.esetl.domain.person; importação org.springframework.data.elasticsearch.repositório.elticsEarchRepository; Long> {}2.3 Configurando o elasticsearchItemwriter
pacote com.hfcsbc.esetl.itemwriter; import com.hfcsbc.esetl.repository.es.espersonRepository; importar com.hfcsbc.eath.exitatus.person; import org.springframework.batch.core.exittatus; importação ou importação; org.springframework.batch.core.stepexecution; importar org.springframework.batch.core.stepexecutionListener; importar org.springframework.batch.item.itemwriter; import java.util.list; ItemWriter <Pesso>, ItemWriteListener <Pesso>, StepExecutionListener {private EspersonRepository PersonRepository; public ElasticsearchItemwriter (EspersonRepository PersonRepository) {this.personRepository = PERSONRepository; } @Override public void antes daWrite (list <? Extende a pessoa> itens) {} @Override public void AfterWrite (list <? Extende Pessoa> itens) {} @Override public void owriteError (exceção de exceção, list <? Extrends Pessoa> itens) {} @Override} AfterStep (StepExecution StepExecution) {return null; } @Override public void write (list <? Extend Pessoa> itens) lança a exceção {// O método SAVEALL que implementa a classe AbstractELASTICSECHERPHERPOSIENTE CHAMA ELATICSearchOperations.bulkindex (consultas), que é um índice de lote de personrepositório.Savall (itens); }}2.4 Configurando o ElasticsearchItemReader (este exemplo não é usado, apenas para referência)
pacote com.hfcsbc.esetl.iTemReader; importar org.springframework.batch.item.data.abstractpagineddataAtemreader; importar org.springframework.beanstics.factory.initializeBean; imporesecringFramework.Data.Ellasticsing.Intick.Elording.initialingBean; org.springframework.data.elasticsearch.core.query.searchquery; importar java.util.iterator;/** * Crie por Pengchao em 2018/2/24 */public classe ElasticsEarchBereader <Pessoa> ExtraindaPaginatinDDataMereadReader <Pessoas> implementInizingBeanizer (Pessoa- Pessoa> resumidapaginatinDdataMereading <Pessoa> implementos iniciais: Consulta de pesquisa final privada; Classe final privada <? estende a pessoa> TargetType; public ElasticsearchItemReader (ElasticsearchOperations ElasticsearchOperations, SearchQuery, Class <? Extende Pessoa> TargetType) {this.elticsearchOperations = ElasticsearchOperations; this.Query = consulta; this.TargetType = TargetType; } @Override Protected Iterator <Sente> dopageRead () {return (iterator <Soper>) elasticsearchOperations.QueryForList (Query, TargetType) .Iterator (); } @Override public void AfterPropertiesset () lança Exceção {}}2.5 Configuração necessária para a configuração do lote de mola
pacote com.hfcsbc.esetl.config; import com.hfcsbc.esetl.itemwriter.elasticsarchItemwriter; import com.hfcsbc.esetl.repositório.esson.ersonRepository; import com.sbc.eSetl.Emerson.memsonRepository; importação comb.fcsbc.esetl.esterson; importestern; org.springframework.batch.core.step; importar org.springframework.batch.core.configuration.annotation.enablebatchprocessing; importar org.springframework.batch.core.configuration.annotation.jobbuilderFactory; importação org.springframework.batch.core.configuration.annotation.StepBuilderFactory; importar org.springframework.batch.core.launch.support.runidincrencier; importação org.springframework.batch.core.repository.jobrepositor org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.JpaPagingItemReader;import org.springframework.batch.item.database.orm.jpanativeQueryProvider; importar org.springframework.beans.factory.annotation.autowired; importação org.springframework.context.annotation.bean; importação; org.springframework.transaction.platformtransactionManager; importar javax.persistence.entityManagerFactory; importar javax.sql.datasource;/** * Criar por Pengchao em 2018/2/23 */@configuration@enabatchProcessingPublic Class BatchfcnCOn/23 */@@EnabatingProcedPerSmingPublic BatchfcnCOlCCon/23 */@@enabatingpitiredppccnCon/23/23 */@configuration@enabatchProcessingpublic Class BatchfcnCon @Bean Public ItemReader <Pesso> OrderItemReader (EntityManagerFactory EntityManagerFactory) {jPaPagingItemReader <Pesso> leitor = new jPaPagingItemReader <sesson> (); String sqlquery = "selecione * da pessoa"; tente {jPanativeQueryProvider <SOmeS> QueryProvider = new JPanativeQueryProvider <Seper> (); queryprovider.setsqlquery (sqlquery); QueryProvider.SetEntityClass (Person.class); queryprovider.afterpropertiesset (); leitor.setEntityManagerFactory (EntityManagerFactory); leitor.setPagesize (10000); leitor.setQueryProvider (QueryProvider); leitor.afterpropertiesset (); leitor.setsAvestate (true); } catch (Exceção e) {e.printStackTrace (); } Return Reader; } @Bean public ElasticsearchItemWriter ItemWriter () {return New ElasticsearchItemwriter (PessoasRepository); } @Bean Public Step STEP (stepbuilderfactory stepbuilderfactory, itemreader ItemReader, itemWriter ItemWriter) {return stepbuilderfactory .get ("step1") .chunk (10000) .reader (itemReader) .Writer (itemWriter) .build (); } @Bean Public Job Job (JobBuilderFactory JobBuilderFactory, Etapa Etapa) {Return JobBuilderFactory .get ("importJOB") .Encrementador (new RunIdIncreler ()) .Flow (STEP) .END () .build (); } /*** Quando o lote da mola for executado, algumas tabelas necessárias serão criadas. Aqui está um local para a criação da tabela: DataSource * @param DataSource * @Param Manager * @return */ @Bean Public JobRepository JobRepository (DataSource DataSource, PlatformTransactionManager Gerenciador) {JobRepositoryFactoryBeanRepositório JobRepositoryFactoryBean.SetDataSource (DataSource); JobRepositoryFactoryBean.SetTransactionManager (gerente); JobRepositoryFactoryBean.SetDatabaseType ("PostGres"); tente {return jobRepositoryFactoryBean.getObject (); } catch (Exceção e) {e.printStackTrace (); } retornar nulo; }}2.6 Configure o endereço de conexão do banco de dados e ES
Primavera: Redis: Host: 192.168.1.222 Dados: JEST: URI: http://192.168.1.222:9200 Nome de usuário: Elastic Senha: ChangeMe JPA: Database: PostGresql Show-Sql: Hibernate: DDL-Auto JDBC: PostGresql: //192.168.1.222: 5433/pessoa nome de usuário: hfcb senha: hfcb driver-class-name: org.postgresql.driver max-attive: 2spring.batch.initialize-schema: sempre
2.7 Configure a classe de entrada
pacote com.hfcsbc.esetl; importar org.springframework.boot.springApplication; importar org.springframework.boot.autoconfigure.springbootApplication; importação org.springframework.autoconfigure.edaSTICS; org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchDataAutoconfiguration; importar org.springframework.data.elasticsarch.repositório.config.enableelasticsearchRepositories; import org.springframework.data.jpa.repository.config.enableJparePositories; @springbootapplication (exclude = {elasticsarchautoconfiguration.classticsearchRearchDataautoconfiguration.class})@habilticsicSearchSearchRePosiPSIESEMEarchRePosiPesiatesEsearchRePosiPesiatesEsearchRePosiPesiatesePosikeSearchRePosikeSearchRePosikeSearchRePosikeSearchSearchRearpSearchRearchRearchRearchSearchRePosiPesiá -los. "com.hfcsbc.esetl.repository")@enableJparePositories (BasEpackages = "com.hfcsbc.esetl.repository.jpa") classe pública eSetLApplication {public static void main (string [] args) {springApplication.runation (setlication {stictic maid main (string [] args) {springApplication.runnication; }}O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.