1. Introducción
Cuando el sistema tiene una gran cantidad de datos que deben importarse de la base de datos, el uso de Spring Batch puede mejorar la eficiencia de la importación. Spring Batch utiliza itemReader para leer páginas de datos, y el itemwriter escribe datos en lotes. Dado que Spring Batch no proporciona el elemento Writer y el Reader de Elastisearch, en este ejemplo, se utiliza un ElasticeSearchitemWriter (ElasticeSearchitemReader) personalizado para la importación por lotes.
2. Ejemplo
2.1 pom.xml
Este artículo utiliza Spring Data Jest para conectar ES (también puede usar Spring Data Elasticsearch para conectar ES), y la versión ES es 5.5.3
<? xml versión = "1.0" encoding = "utf-8"?> <Project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschemainstance" xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <ModelVersion> 4.0.0 </modelversion> <MoupRoMid> com.hfcsbc.estl </groupid> <artifactid> es-etl </sartifactid> <versión> 0.0.1-snapshot </versewer> <compaingo> jar </packaging> <name> es-etl </name> <scription> Proyecto de demostración para el arranque de primavera </solding> <-parent> <grupo> org.springframework.boot </groupid> <artifactid> spring-boot-sharter-piter-piter-piter-piter-pararter </artemid> <versión> 2.0.0.m7 </versión> <relativePath/> <!-Buscar padre desde repositorio-> </parent> <properties> <ject.build.sourceEncoding> UTF-8 </project.build.sourceEnding> <jection.Reporting.outputenCoding> Utf-8 </project.Reporting. <java.version> 1.8 </java.version> </propiences> <pendencies> <pendency> <uproupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter </artifactid> </pendency> <epardency> <uproupid> org.springframe.boot </groupid> <artifactid> spring-boot-starter-data-jpa </artifactid> </pendency> <pendency> <uproupid> org.postgresql </groupid> <artifactid> postgresql </artifactid> </dependency> <pendency> <proupid> org.springframework.Boot </groupid> <artifactid> spring-boot-starter-batch </artifactid> </pendency> <epardency> <uproupid> com.github.vanroy </proupid> <artifactid> spring-boot-starter-data-jest </arifactid> <version> 3.0.0.0.Release </version> </dependency> <pendency> <proupid> iO.o.sears >Sear <artifactid> jest </arfactid> <versión> 5.3.2 </versewer> </pendency> <pendency> <uproupid> org.projectlombok </groupid> <artifactid> lombok </artifactid> </pendency> <pendency> <grupoD> org.springframework.boot </proupid> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <S Repository> <id> Spring-Snapshots </id> <nec> Spring Snapshots </name> <URL> https://repo.spring.io/snapshot </srl> <Snapshots> <Dabled> true </Ennabled> </snstapshots> </spository> <Repository> <d> Spring-Milestones </Id> <NeNAVIT <Url> https://repo.spring.io/milestone </sl> <Snapshots> <NAVADED> FALSE </Engabled> </snstapshots> </epository> </repositorio> </spository> </epository> </pluginRepository> <EntlEntepository> <iD> spring-snapshots </id> <diD> nameShots <URL> https://repo.spring.io/snapshot </srl> <Snapshots> <Enedably> true </habilitado> </snstapshots> </pluginrePository> <gluginRepository> <id> spring-milestones </di> <name> spring milestones </name> <URL> https://repo.spring.io/milestone </sl> <Snapshots> <Dabled> false </habilitado> </snstapshots> </pluginrepository> </tuginRepository> </pluginRepository> </tuginRepository> </tuginRepositories> </proyecto>
2.2 Clase de entidad y repositorio
paquete com.hfcsbc.esetl.domain; import lombok.data; import org.springframework.data.elasticsearch.annotations.document; import org.springframework.data.elasticsearch.annotations.field; import og.springframework.data.elasticsearch.annotations; import org.springframework.data.elasticsearch.annotations.fieldtype; import javax.persistence.entity; import javax.persistence.id; import javax.persistence.onetOone;/** * crea por Pengchao en 2018/2/23 */@documento (indexName = "persona", tipo = "persona", shards, requisito, requisito, requisito, requisito, requisito, requisito, requisito, repletamente. refreshinterval = "-1")@entity@dataPublic clase persona {@id private long id; nombre de cadena privada; @Onetoone @field (type = fieldType.nested) Dirección de dirección privada;} paquete com.hfcsbc.esetl.domain; import lombok.data; import javax.persistence.entity; import javax.persistence.id;/** * Create by Pengchao en 2018/2/23 */@Entity@dataPublic Class Dirección {@id privado ID largo; nombre de cadena privada;}paquete com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.person; import og.springframework.data.jpa.repository.jparpository;/** * crea por pengchao en 2018/2/23 */pública interfaz de interfaze
paquete com.hfcsbc.esetl.repository.es; import com.hfcsbc.esetl.domain.person; import org.springframework.data.elasticsearch.repository.elasticsearchRepository;/** * Crear por Pengchao en 2018/2/23 */public interface ESPERSONRETORIORY EXTENDENDS EXTENDENDS EXTENDENDS ELTENDSECHESECHESEARD {}2.3 Configuración de ElasticSearchitemWriter
paquete com.hfcsbc.esetl.itemwriter; import com.hfcsbc.esetl.repository.es.espersonRepository; import org. Itemwriter <Oll>, itemwriteListener <Oll>, stepExecUdListener {priversonRepository PersonRepository; public ElasticSearchitemWriter (ESPERSONREPOSITORY PersonRepository) {this.personRepository = PersonRepository; } @Override public void antes de Write (list <? Extends Person> elementos) {} @Override public void AfterWrite (list <? Extends Person> items) {} @Override public void onwriteError (excepción excepción, list <? Extendes Person> items) {} @Override public AfterStep (StepExecution StepExecution) {return null; } @Override public void Write (list <? Extends Person> Elementos) lanza la excepción {// El método Saveall que implementa la clase AbstractElasticSearchRepository llama a ElasticSearchOperations.BulkIndex (consultas), que es un índice de lotes de PersonRepository.SaveAll (elementos); }}2.4 Configuración de ElasticSearchitemReader (este ejemplo no se usa, solo para referencia)
paquete com.hfcsbc.esetl.itemreader; import org.springframework.batch.item.data.abstractpagineddataitemreader; import org.springframework.beans.factory.inicializingBean; import org.springframework.data.elasticsearch.core.query.searchQuery; import java.util.iterator;/** * Create by Pengchao en 2018/2/24 */public classasticeSearchitemreader <persona> extiende el dataitemreader de abstractpagined DataiteMeader <Oll> Implementos inicializados de inicio de la evaluación privada de elastices. Consulta de búsqueda final privada; Clase final privada <? extiende la persona> Targettype; Public ElasticeSearchitemReader (ElasticeSearchOperations ElasticSearchOperations, SearchQuery Consult, class <? Extends Person> Targettype) {this.elasticeSearchOperations = ElasticSearchOperations; this.Query = Query; this.targettype = TargetType; } @Override protegido iterator <Oll> dopageRead () {return (iterator <Oll>) elasticSearchOperations.QueryForList (Query, TargetType) .Iterator (); } @Override public void AfterPropertiesSet () lanza la excepción {}}2.5 Configuración requerida para configurar Spring Batch
paquete com.hfcsbc.esetl.config; import org.springframework.batch.core.step; import org.springframework.batch.core.configuration.annotation.enableBatchProcessing; import og.springframework.batch.core.configuration.annotation.jobbuilderfactory; import org.springframework.batch.core.configuration.annotation.stepbuilderfactory; import org.springframework.batch.core.launch.support.runidincrementer; import org.springframework.batch.core.repository.jobroitory; import org.springframework.batch.core.repository.support.jobreositoryFactoryBean; import og.springframework.batch.item.itemreader; import og.springframework.batch.item.itemwriter; import og.springframework.item.iteme.jpapeageringemer; org.springframework.batch.item.database.orm.jpanativeQueryProvider; import org.springframework.beans.factory.annotation.aUtowired; importar org.springframework.context.annotation.bean; import og.springframework.context.contetation.configuration; org.springframework.transaction.platformtransactionManager; import javax.persistence.entityManagerFactory; import javax.sql.dataSource;/** * Create by Pengchao en 2018/2/2/23 */@Configuración@EnableBatchProssingPublicCublic Class BatchConfig {@autowired privado privado Especera de espera; @Bean Public itemArter <Oll> OrderItemReader (EntityManagerFactory EntityManagerFactory) {JPAPAGINGITEMREADER <OLLO> LECTER = new JPapagingItemReader <Oller> (); Cadena sqlQuery = "select * de persona"; Pruebe {jPanativeQueryProvider <Oller> QueryProvider = new JPanativeQueryProvider <Oller> (); QueryProvider.setsqlQuery (SQLQuery); QueryProvider.SetEntityClass (Person.Class); QueryProvider.AfterPropertIesset (); Reader.SetEntityManagerFactory (EntityManagerFactory); lector.setPageSize (10000); Reader.setQueryProvider (QueryProvider); lector.AfterPropertIesset (); lector.setsavestate (verdadero); } catch (Exception e) {E.PrintStackTrace (); } Return Reader; } @Bean public ElasticSearchitemWriter itemWriter () {return New ElasticSearchitemWriter (PersonRepository); } @Bean Public Step Paso (StepBuilderFactory StepBuilderFactory, itemReader itemReader, itemwriter ElemItemwriter) {return StepBuilderFactory .get ("Step1") .chunk (10000) .Reader (itemReader) .Writer (itemwriter) .Build (); } @Bean Public Job Job (JobBuilderFactory JobBuilderFactory, Step Step) {return JobBuilderFactory .get ("importJob") .Incrementer (new RunIdIncrementer ()) .flow (step) .end () .Build (); } /*** Cuando se ejecuta el lote de primavera, se crearán algunas tablas que necesita. Aquí hay una ubicación para la creación de la tabla: DataSource * @param dataSource * @param ganager * @return */ @bean public jobRepository JobRepository (DataSource DataSource, PlatformTransactionManager Manager) {JobRepositoryFactoryBeAspertoryFactoryBean = New JobRepositoryFactoryBean ();; JobRepositoryFactoryBean.SetDataSource (DataSource); JobRepositoryFactoryBean.SetTransactionManager (gerente); JobRepositoryFactoryBean.SetDatabasetype ("Postgres"); intente {return JobRepositoryFactoryBean.getObject (); } catch (Exception e) {E.PrintStackTrace (); } return null; }}2.6 Configure la dirección de conexión de la base de datos y ES
Primavera: Redis: Host: 192.168.1.222 Datos: Jest: Uri: http://192.168.1.22:9200 Nombre de usuario: Nombre de usuario: contraseña elástica: ChangeMe JPA: Base de datos: Postgresql show-sql: verdadero HiBernate: DDL-Auto: Actualización de datos: Plataforma: Postgres: URL: URL: URL: JDBC: Postgresql: //192.168.1.222: 5433/persona Nombre de usuario: HFCB Contraseña: HFCB controlador-class-name: org.postgresql.driver max-activo: 2spring.batch.initialize-schema: siempre siempre
2.7 Configurar la clase de entrada
paquete com.hfcsbc.esetl; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.boot.autoconfigure.data.elasticsearcherearchautoconfiguration; org. org.springframework.data.jpa.repository.config.enableJParepositories; @SpringBootApplication (excluir = {elasticsearchautoconfiguration.class, elasticsearchDataAutoconfiguration.class})@enableElsticSeartRepositories (basepackages = = "com.hfcsbc.esetl.repository")@enablejparePositories (basepackages = "com.hfcsbc.esetl.repository.jpa") clase pública esetLapplication {public static void main (string [] arguments {springapplication.run (esetlaPplication.class, argss); }}Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.