本文共 16564 字,大约阅读时间需要 55 分钟。
服务端主要功能
这里不对每一点就讲解,就讲一下服务的注册。
如果看过eureka客户端的代码,可以知道,eureka客户端是通过发送http请求向服务端注册信息的,这里来看一下入口代码:
/** * Registers information about a particular instance for an * {@link com.netflix.discovery.shared.Application}. * * @param info * {@link InstanceInfo} information of the instance. * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. */ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // 信息校验 if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // 处理客户端可能使用错误的数据中心信息注册缺少数据的情况 DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } //这里是具体的注册逻辑。 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
这里看register的主要逻辑
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //信息注册 super.register(info, leaseDuration, isReplication); //同步信息给其他节点 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
信息注册
/** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); //根据应用名获取该应用名的所有信息 Map> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); //如果信息不存在,重新new一个map if (gMap == null) { final ConcurrentHashMap > gNewMap = new ConcurrentHashMap >(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if ("true".equals(serverConfig.getExperimental("registry.registration.ignoreIfDirtyTimestampIsOlder"))) { logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } else { registrant.setLastDirtyTimestamp(existingLastDirtyTimestamp); } } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } //把注册信息封装成一个对象,注册中心保存的其实是这个对象 Lease lease = new Lease (registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //这里保存了本次同步的信息 gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair ( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
到这里为止,信息已经保存了,注册中心最主要的一个属性就是registry,它保存了一个两层的map,第一层的key是AppName,我们配置的应用名,第二次的key是实例id,具体对应到每个服务。
private final ConcurrentHashMap>> registry = new ConcurrentHashMap >>();
下面再看下同步信息到其他节点
/** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. * */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // 跳过自身 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } //同步信息到其他节点,这里node里面保存很多其他节点的信息,主要用于和其他节点交互 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
这里主要看replicateInstanceActionsToPeers方法
/** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
看到这个方法,可以知道,注册中心相互节点之间其实是有很多数据同步操作的,现在这个是注册操作,我们直接看注册的代码。
/** * Sends the registration information of {@link InstanceInfo} receiving by * this node to the peer node represented by this class. * * @param info * the instance information {@link InstanceInfo} of any instance * that is send to this instance. * @throws Exception */ public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponseexecute() { //这是主要代码,这里是去发送一个http请求到对方节点 return replicationClient.register(info); } }, expiryTime //这里会有一个延迟,这是个任务,不会立即执行 ); }
给客户端获取信息的入口,这里用到了缓存,其实是把注册中心的信息保存在了缓存中,直接返回,避免频繁读取注册的map。
/** * Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}. * ** The delta changes represent the registry information change for a period * as configured by * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The * changes that can happen in a registry include * Registrations,Cancels,Status Changes and Expirations. Normally * the changes to the registry are infrequent and hence getting just the * delta will be much more efficient than getting the complete registry. *
* ** Since the delta information is cached over a period of time, the requests * may return the same data multiple times within the window configured by * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients * are expected to handle this duplicate information. *
* * @param version the version of the request. * @param acceptHeader the accept header to indicate whether to serve JSON or XML data. * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data. * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept} * @param uriInfo the {@link java.net.URI} information of the request made. * @return response containing the delta information of the * {@link AbstractInstanceRegistry}. */ @Path("delta") @GET public Response getContainerDifferential( @PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); // If the delta flag is disabled in discovery or if the lease expiration // has been disabled, redirect clients to get all instances if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) { return Response.status(Status.FORBIDDEN).build(); } String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL_DELTA.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { return Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { return Response.ok(responseCache.get(cacheKey)) .build(); } }
有问题请留言。
转载地址:http://osjqi.baihongyu.com/