eureka is a decentralized service governance application, and its distinctive feature is that it can be registered with the address you configured as both a server and a service. Then in this article, let’s discuss the registration process of eureka.
1. Eureka's server
The core class of eureka's server side is EurekaBootstrap, which implements a listener of ServletContextListener. Therefore, we can conclude that eureka is implemented based on servlet containers. The key code is as follows:
public class EurekaBootStrap implements ServletContextListener { //...Omit related code/** * Initializes Eureka, including syncing up with other Eureka peers and publishing the registry. * * @see * javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent) */ @Override public void contextInitialized(ServletContextEvent event) { try { initEurekaEnvironment(); initEurekaServerContext(); ServletContext sc = event.getServletContext(); sc.setAttribute(EurekaServerContext.class.getName(), serverContext); } catch (Throwable e) { logger.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } //Omit related code......}We can see that when the ServletContext initialization is completed, the Eureka environment will be initialized, and then the EurekaServerContext will be initialized. Then we are taking a look at the initEurekaServerContext method:
/** * init hook for server context. Override for custom logic. */ protected void initEurekaServerContext() throws Exception { // ...... ApplicationInfoManager applicationInfoManager = null; if (eurekaClient == null) { EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); } PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); } //...Omit part of the code}In this method, many objects related to the eureka service will be created. Here I list two core objects, namely eurekaClient and PeerAwareInstanceRegistry. We will talk about the client part later. Let’s take a look at what PeerAwareInstanceRegistry is used for. Here I write a class diagram about this class:
According to the class diagram, we can clearly find that the top-level interface of PeerAwareInstanceRegistry is LeaseManager and LookupService, where LookupService defines the behavior of the most basic discovery example, while LeaseManager defines the processing of client registration, renewal, and cancellation operations. So in this article, let’s focus on the implementation of the related interfaces of LeaseManager. Looking back, we are looking at PeerAwareInstanceRegistry. In fact, this class is used to copy relevant information under multiple nodes. For example, if a node registers for renewal and offline, then the relevant copy (notification) will be copied to each node through this class. Let's see how it handles client registration:
/** * Registers the information about the {@link InstanceInfo} and replicas * this information to all peer eureka nodes. If this is replication event * from other replica nodes then it is not replicated. * * @param info * the {@link InstanceInfo} to be registered and replicated. * @param isReplication * true if this is a replication event from other replica nodes, * false otherwise. */ @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }We can see that it calls the register method of the parent class and then replicates the corresponding behavior to other nodes through replicateToPeers. The specific replication will not be discussed here. Let's focus on the registration method. We find the register() method in the parent class:
/** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registration, int leaseDuration, boolean isReplication) {try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registered.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registration"); registration = existLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); //. . . Omit part of the code}Through the source code, let’s briefly sort out the process:
1) First, get some columns of service instance objects based on the appName. If it is Null, create a new map and add the current registered application information to this map. There is a Lease object here. This class describes the time attributes of generic T, such as registration time, service startup time, final update time, etc. You can pay attention to its implementation:
/* * Copyright 2012 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governance permissions and * limitations under the License. */package com.netflix.eureka.lease;import com.netflix.eureka.registry.AbstractInstanceRegistry;/** * Describes a time-based availability of a {@link T}. Purpose is to avoid * accumulation of instances in {@link AbstractInstanceRegistry} as result of ungraceful * shutdowns that is not uncommon in AWS environments. * * If a lease elapses without renewals, it will eventually expire continuously * marking the associated {@link T} for immediate eviction - this is similar to * an explicit cancellation except that there is no communication between the * {@link T} and {@link LeaseManager}. * * @author Karthik Ranganathan, Greg Kim */public class Lease<T> { enum Action { Register, Cancel, Renew }; public static final int DEFAULT_DURATION_IN_SECS = 90; private T holder; private long evictionTimestamp; private long registrationTimestamp; private long serviceUpTimestamp; // Make it volatile so that the expiration task would see this quicker private volatile long lastUpdateTimestamp; private long duration; public Lease(T r, int durationInSecs) { holder = r; registrationTimestamp = System.currentTimeMillis(); lastUpdateTimestamp = registrationTimestamp; duration = (durationInSecs * 1000); } /** * Renew the lease, use renewal duration if it was specified by the * associated {@link T} during registration, otherwise default duration is * {@link #DEFAULT_DURATION_IN_SECS}. */ public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } /** * Cancels the lease by updating the eviction time. */ public void cancel() { if (evictionTimestamp <= 0) { evictionTimestamp = System.currentTimeMillis(); } } /** * Mark the service as up. This will only take affect the first time called, * subsequent calls will be ignored. */ public void serviceUp() { if (serviceUpTimestamp == 0) { serviceUpTimestamp = System.currentTimeMillis(); } } /** * Set the leaves service UP timestamp. */ public void setServiceUpTimestamp(long serviceUpTimestamp) { this.serviceUpTimestamp = serviceUpTimestamp; } /** * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not. */ public boolean isExpired() { return isExpired(0l); } /** * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not. * * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect * instances that ungracefully shutdown. Due to possible wide range impact to existing usage, this will * not be fixed. * * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms. */ public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs); } /** * Gets the millionseconds since epoch when the lease was registered. * * @return the millionseconds since epoch when the lease was registered. */ public long getRegistrationTimestamp() { return registrationTimestamp; } /** * Gets the millionseconds since epoch when the lease was last renewed. * Note that the value returned here is actually not the last lease renewal time but the renewal + duration. * * @return the million seconds since epoch when the lease was last renewed. */ public long getLastRenewalTimestamp() { return lastUpdateTimestamp; } /** * Gets the millionseconds since epoch when the lease was evicted. * * @return the millionseconds since epoch when the lease was evicted. */ public long getEvictionTimestamp() { return evictionTimestamp; } /** * Gets the millionseconds since epoch when the service for the lease was marked as up. * * @return the millionseconds since epoch when the service for the lease was marked as up. */ public long getServiceUpTimestamp() { return serviceUpTimestamp; } /** * Returns the holder of the lease. */ public T getHolder() { return holder; }}2) According to the currently registered ID, if you can get it in the map, do the following:
2.1) According to the touch time of the currently existing node and the touch time of the registered node, if the former time is later than the latter time, the currently registered instance shall be subject to the existing instance.
2.2) Otherwise, update its expected renewal number per minute and its threshold
3) Save the current registration node into the map, and our registration process has basically come to an end
2. eureka client
When the server servletContext is initialized, a DiscoveryClient will be created. Friends who are familiar with eureka must be familiar with these two attributes: fetchRegistry and registerWithEureka. When running in integrated eureka independent mode in springcloud, if these two values are not false, then the startup will report an error. Why does it report an error? In fact, the answer lies in the constructor of DiscoveryClient:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {//....Omit part of the code if (!config. shouldRegisterWithEureka() && !config. shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); //....Omit some code initScheduledTasks(); //....}Based on the source code, we can draw the following conclusions:
1) If both shouldRegisterWithEureka and shouldFetchRegistry are false, then return directly.
2) Create a thread pool that sends heartbeats and refreshes caches
3) Initialize the created timed tasks
Then let's take a look at the following code in the initScheduledTasks() method:
// Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
Here is a thread that triggers a timed execution, in seconds, and executes a sending heartbeat according to the renewalIntervalInSecs value. The HeartbeatThread thread executes as follows:
/** * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }We can see that the run method is very simple to execute the renew method, and if the time is successfully recorded. Renew method:
/** * Renew with the eureka service by making the appropriate REST call */ boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }If the heartbeat is sent here, if the return is 404, the registration operation will be performed. Note that based on the return value httpResponse, we can conclude that all these operations are based on http requests. Is that true? Let's continue to look at the register method:
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }Here, the registrationClient method in eurekaTransport is called:
private static final class EurekaTransport { private ClosableResolver bootstrapResolver; private TransportClientFactory transportClientFactory; private EurekaHttpClient registrationClient; private EurekaHttpClientFactory registrationClientFactory; private EurekaHttpClient queryClient; private EurekaHttpClientFactory queryClientFactory; void shutdown() { if (registrationClientFactory != null) { registrationClientFactory.shutdown(); } if (queryClientFactory != null) { queryClientFactory.shutdown(); } if (registrationClient != null) { registrationClient.shutdown(); } if (queryClient != null) { queryClient.shutdown(); } if (transportClientFactory != null) { transportClientFactory.shutdown(); } } }Here we can see that eureka's client uses http request to register the service, which means that when we create DiscoveryClient, we will register the instance with the server.
3. Rest service provided by the server
We have already seen the code provided by the server to handle client registration requests. Since the client registers through the HTTP protocol, the server must have an address to handle this http request. In fact, the eureka server uses the jax-rs standard to provide rest method to expose the service. We can take a look at the addInstance method of this ApplicationResource:
/** * Registers information about a particular instance for an * {@link com.netflix.discovery.shared.Application}. * * @param info * {@link InstanceInfo} information of the instance. * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. */ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName()))) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName()))) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.