1. Introduction
Lorsque le système dispose d'une grande quantité de données qui doivent être importées de la base de données, l'utilisation du lot Spring peut améliorer l'efficacité de l'importation. Spring Batch utilise ItemReader pour lire les pages de données et ItemWriter écrit des données par lots. Étant donné que Spring Batch ne fournit pas d'objet d'objets d'Elastisearch et d'objets, dans cet exemple, un elasticsearchItemwriter personnalisé (ElasticSearchItemReader) est utilisé pour l'importation par lots.
2. Exemple
2.1 pom.xml
Cet article utilise Spring Data Jest pour connecter ES (vous pouvez également utiliser Spring Data Elasticsearch pour connecter ES), et la version ES est 5.5.3
<? xml version = "1.0" Encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion> 4.0.0 </ Modelversion> <ProupId> com.hfcsbc.estl </proupId> <ArtefactId> es-etl </ artifactId> <DERSE> 0.0.1-snapshot </-version> <packaging> jar </packaging> <name> es -tl </name> <description> Project de démo pour Spring Boot </ Description> <parent> <proupId> org.springframework.boot </proncId> <ArtifActid> Spring-boot-starter-Parient </proncID> <version> 2.0.0.m7 </ version> <RelativePath /> <! - Recherche Parent de Repository -> </parent> <properties> <project.build.sourceencoding> utf-8 </project.build.sourceencoding> <project.reporting.outpuleding> utf-8 </project.reporting.outpulencoding> <java.version> 1.8 </java.version> </properties> <dependencies> <dependency> <proupId> org.springframework.boot </pruinid> <ptefactid> printemps-boot-starter </ artifactid> </peedency> <dependency> <proupId> org.springframework.boot </proupId> <ArtefactId> Spring-Boot-starter-data-jpa </lefactive> </dependency> <dependency> <proupId> org.postgresql </proishid> <petifactid> PostgreSQL </ artifactid> </dependency> <dependency> <proupId> org.springframework.boot </preamid> <ArtefactId> Spring-Boot-Starter-Batch </ Artifactid> </ Dependency> <Dependance> <ProupId> com.github.vanroy </prouvendid> <prefactive> printemps-boot-starter-data-jest </letefactid> <version> 3.0.0.release </preedId> </pedency> <preency> <preambid> <ArtefactId> Jest </ artifactid> <in version> 5.3.2 </ version> </Dependency> <Dedency> <ProupId> org.projectlombok </pruipId> <ArtifactId> Lombok </Artifactid> </Dependency> <Dependency> <ProupId> org.springFramework.boot </proupId> <ArtefactId> Spring-Boot-Starter-Test </ Artifactid> <Cope> Test </cope> </Dependency> </Dependces> <Duild> <Glugins> <GroupId> org.springFramework.boot </prouprid> <eRtifactid> Spring-Booot-Maven-Plugin </Retifactid> </GluciN> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url> https://repo.spring.io/milestone </url> <napshots> <veabled> false </veabled> </napshots> </ repository> </ Repository> </ Repository> </ Repository> </pluitory> <name> SpringShots </ Id> Spring-Snapshots </ Id> <nom Springs </ NameS </ Name> </ Id> Spring-Snapshots </ Id> <nom Springs </ Name Spring> <Ded> Spring-Snapshots </ Id> <name> Springs </ Name Spring> <Ded> Spring-Snapshots </ Id> <name> Springs </ Name Spring> <DID> Spring-Snapshots </ Id> <nom Springs " <url> https://repo.spring.io/snapshot </url> <napshots> <veabled> true </veabled> </napshots> </ PluginRepository> <PluginRepository> <Id> Spring-Milestones </ id> <name> Spring Milestones </name> <url> https://repo.spring.io/milestone </url> <napshots> <veabled> false </veabled> </napshots> </ pluginRepository> </ pluginRepository> </ pluginRepository> </ pluginRepository> </gluginrepositories> </prot>
2.2 Classe d'entité et référentiel
package com.hfcsbc.esetl.domain; import lombok.data; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.ellasticsearch.annotations.field; Import org.springframework.data.ellasticsearch.annotations.field; org.springframework.data.elasticsearch.annotations.FieldType;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.OneToOne;/** * Create by pengchao on 2018/2/23 */@Document(indexName = "person", type = "person", shards = 1, replicas = 0, RefreshInterval = "-1") @ Entity @ Datapublic Class Person {@id private long id; nom de chaîne privé; @OneToOne @field (type = fieldType.nest) Adresse d'adresse privée;} package com.hfcsbc.esetl.domain; import lombok.data; import javax.persistence.entity; import javax.persistence.id; / ** * créer par Pengchao sur 2018/2/23 * / @ entité @ datapublic class adresse {@id private long id; Nom de la chaîne privée;}package com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.person; import org.springframework.data.jpa.repository.jParepository; / ** * Créer par Pengchao le 2018/2/23 * / Interface publique Personnes Personnes interface
Package com.hfcsbc.esetl.repository.es; import com.hfcsbc.esetl.domain.person; import org.springframework.data.elasticsearch.repository.elasticsearchrepository; / ** * Créer par pesgChao sur Elastricsearchre <Sumpository <Sumpository <Persondert Long> {}2.3 Configuration d'ElasticSchIttemWriter
Package com.hfcsbc.esetl.itemwriter; import com.hfcsbc.esetl.repository.es.espersonrepository; import com.hfcsbc.esetl.domain.eson; import org.springframework.batch.core.itemwrite; import org.springframeworkwork.borre.itemwrite; org.springframework.batch.core.stepexecution; import org.springframework.batch.core.stepexecutionListener; import org.springframework.batch.item.itemwriter; import java.util.list; / ** * créer par pengchao le 2018/2/23 * Itemwriter <onon>, itemwriteListener <onom>, StepExecutionListener {private espersonrepository PersonPository; public elasticSearchItemWriter (EspersonRepository PersonRepository) {this.personRepository = PersonRepository; } @Override public void avantwrite (list <? Étend des éléments de la personne>) {} @Override public void afterwrite (list <? Étend des éléments de personne>) {} @Override public void onwriteError (exception exception, list <? Étend la personne>) {} @Override public Void beforestatep AfterStep (StepExecution StepExecution) {return null; } @Override public void write (list <? Étend des éléments de personne>) lève une exception {// la méthode SAVEALL qui implémente la classe AbstractELasticSearchRepository appelle elasticsearchoperations.bulkindex (requêtes), qui est un index de lots de PersonRepository.SaveAll (éléments); }}2.4 Configuration d'ElasticSearchItemReader (Cet exemple n'est pas utilisé, pour référence uniquement)
package com.hfcsbc.esetl.itemReader; import org.springframework.batch.item.data.abstractpaginéedataittemreader; import org.springframework.beans.factory.initializingbean; org.springframework.data.elasticsearch.core.query.searchquery; import java.util.iterator; / ** * Créer par Pengchao le 2018/2/24 * / classe publique ElasticsearchItemReader <Person> étend l'abstraitpaginedDataitearchoperperSoperation; requête privée de recherche de recherche; classe finale privée <? étend la personne> TargetType; public elasticSchIttemReader (elasTICSearchOperations elasTICSearchOperations, SearchQuery Query, classe <? étend la personne> TargetType) {this.ElasticSearchOperations = elasTICSearchOperations; this.query = requête; this.targetType = TargetType; } @Override Protected Iterator <Sonv> dopageRead () {return (iterator <onon>) elastiCSearchOperations.Queryforlist (query, cibleType) .iterator (); } @Override public void afterpropertiesset () lève l'exception {}}2.5 Configuration requise pour configurer le lot de ressort
Package com.hfcsbc.esetl.config; import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter; import com.hfcsbc.esetl.repository.es.espersonrepository; import com.hfcsbc.esetl.domain.person; importation org.springframeworkwork.borre.job; importation org.spring org.springframework.batch.core.step; import org.springframework.batch.core.configuration.annotation.enableBatchProcessing; import org.springframework.batch.core.configuration.annotation.jobbuilderfactory; importation; org.springframework.batch.core.configuration.annotation.stepbuilderfactory; import org.springframework.batch.core.launch.support org.springframework.batch.core.repository.support.jobrepositoryfactoryBean; import org.springframework.batch.item.itemreader; import org.springframework.batch.item.itemwriter; import org.springframework.batch.item.database. org.springframework.batch.item.database.orm.jpanativeQueryProvider; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import; org.springframework.transaction.platformTransactionManager; import javax.persistence.entityManagerFactory; Importer Javax.sql.datasource; / ** * Créer par Pengchao sur la classe BatchConfig {@ @Autowired Private PersonRepository; @Bean public ItemReader <onon> OrderItemReader (EntityManagerFactory EntityManagerFactory) {jPapagingItemReader <onon> Reader = new JPapagingItemReader <onving> (); String SqlQuery = "SELECT * FROM Person"; essayez {JPanativeQueryProvider <ony> queryProvider = new JPanativeQueryProvider <onom> (); QueryProvider.SetsQlQuery (SqlQuery); queryProvider.SetEntityClass (Person.class); queryprovider.afterpropertiesset (); Reader.SetEntityManagerFactory (EntityManagerFactory); Reader.SetPageSize (10000); Reader.setQueryProvider (QueryProvider); Reader.AfterPropertiseSet (); Reader.SetSaveState (true); } catch (exception e) {e.printStackTrace (); } return lecteur; } @Bean public elasticsearchitemwriter itemWriter () {return new elasticSearchItemWriter (PersonRepository); } @Bean public Step Step (StepBuilderFactory StepBuilderFactory, itemReader itemReader, itemwriter itemwriter) {return StepBuilderFactory .get ("Step1") .Chunk (10000) .Reader (itemReader) .Writer (itemWriter) .Build (); } @Bean Public Job Job (jobBuilderFactory jobBuilderFactory, Step Step) {return jobBuilderFactory .get ("importJob") .Incremeter (new RunIdInCreader ()) .flow (Step) .end () .Build (); } / ** * Lorsque le lot de ressort est exécuté, certaines tables dont elles ont besoin seront créées. Voici un emplacement pour la création de table: DataSource * @param datasource * @param manager * @return * / @bean public jobRepository jobRepository (dataSource dataSource, plateformeTransActionManager Manager) {jobrepositoryfactoryBean jobRepositoryFactoryBean = newRepositoryFactoryBean (); JobRepositoryFactoryBean.SetDataSource (DataSource); JobRepositoryFactoryBean.SetTransactionManager (Manager); JobRepositoryFactoryBean.SetDatabasetype ("Postgres"); essayez {return jobRepositoryFactoryBean.getObject (); } catch (exception e) {e.printStackTrace (); } return null; }}2.6 Configurer l'adresse de connexion de la base de données et ES
Spring: redis: hôte: 192.168.1.222 Données: Jest: URI: http://192.168.1.222:9200 Nom d'utilisateur: Mot de passe élastique: ChangeMe JPA: Database: mise à jour de DataSource: plateforme: Postgres Url: JDBC: PostgreSQL: //192.168.1.222: 5433 / Person Nom d'utilisateur: HFCB Mot de passe: HFCB Driver-Class-Name: org.postgresql.Driver Max-active: 2SPRING.BATCH.Initialize-Schema: toujours
2.7 Configurer la classe d'entrée
package com.hfcsbc.esetl; import org.springframework.boot.springApplication; import org.springframework.boot.autoconfigure.springbootapplication; org.springframework.boot.autoconfigure.data.ellasticsearch.ellasticsearchdataautoconfiguration; import org.springframework.data.ellasticsearch.repository.config.enableElasticSearchrepositries; Importer; org.springframework.data.jpa.repository.config.enablejParePositories; @springbootapplication (exclure = {elasticsearchAutoConfiguration.class, elasticsearchdataautoconfiguration.class}) @ eableElasticSearchDataSitories (Baspackages = "com.hfcsbc.esetl.repository") @ activerjParePositories (basepackages = "com.hfcsbc.esetl.repository.jpa") classe publique EsetlApplication {public static void main (string [] args) {springapplication.run (esetlapplication.class, args); }}Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.