博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
eureka服务端源码分析
阅读量:4230 次
发布时间:2019-05-26

本文共 16564 字,大约阅读时间需要 55 分钟。

服务端主要功能

  1. 服务的注册
  2. 服务的续约
  3. 服务的下线
  4. 给客户端提供服务信息

这里不对每一点就讲解,就讲一下服务的注册。

如果看过eureka客户端的代码,可以知道,eureka客户端是通过发送http请求向服务端注册信息的,这里来看一下入口代码:

/**     * Registers information about a particular instance for an     * {@link com.netflix.discovery.shared.Application}.     *     * @param info     *            {@link InstanceInfo} information of the instance.     * @param isReplication     *            a header parameter containing information whether this is     *            replicated from other nodes.     */    @POST    @Consumes({"application/json", "application/xml"})    public Response addInstance(InstanceInfo info,                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);        // 信息校验        if (isBlank(info.getId())) {            return Response.status(400).entity("Missing instanceId").build();        } else if (isBlank(info.getHostName())) {            return Response.status(400).entity("Missing hostname").build();        } else if (isBlank(info.getAppName())) {            return Response.status(400).entity("Missing appName").build();        } else if (!appName.equals(info.getAppName())) {            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();        } else if (info.getDataCenterInfo() == null) {            return Response.status(400).entity("Missing dataCenterInfo").build();        } else if (info.getDataCenterInfo().getName() == null) {            return Response.status(400).entity("Missing dataCenterInfo Name").build();        }        // 处理客户端可能使用错误的数据中心信息注册缺少数据的情况        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();        if (dataCenterInfo instanceof UniqueIdentifier) {            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();            if (isBlank(dataCenterInfoId)) {                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));                if (experimental) {                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";                    return Response.status(400).entity(entity).build();                } else if (dataCenterInfo instanceof AmazonInfo) {                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);                    if (effectiveId == null) {                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());                    }                } else {                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());                }            }        }        //这里是具体的注册逻辑。        registry.register(info, "true".equals(isReplication));        return Response.status(204).build();  // 204 to be backwards compatible    }

这里看register的主要逻辑

@Override    public void register(final InstanceInfo info, final boolean isReplication) {        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {            leaseDuration = info.getLeaseInfo().getDurationInSecs();        }                //信息注册        super.register(info, leaseDuration, isReplication);                //同步信息给其他节点        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);    }

信息注册

/**     * Registers a new instance with a given duration.     *     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)     */    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {        try {            read.lock();            //根据应用名获取该应用名的所有信息            Map
> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); //如果信息不存在,重新new一个map if (gMap == null) { final ConcurrentHashMap
> gNewMap = new ConcurrentHashMap
>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease
existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if ("true".equals(serverConfig.getExperimental("registry.registration.ignoreIfDirtyTimestampIsOlder"))) { logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } else { registrant.setLastDirtyTimestamp(existingLastDirtyTimestamp); } } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } //把注册信息封装成一个对象,注册中心保存的其实是这个对象 Lease
lease = new Lease
(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //这里保存了本次同步的信息 gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair
( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }

到这里为止,信息已经保存了,注册中心最主要的一个属性就是registry,它保存了一个两层的map,第一层的key是AppName,我们配置的应用名,第二次的key是实例id,具体对应到每个服务。

private final ConcurrentHashMap
>> registry = new ConcurrentHashMap
>>();

下面再看下同步信息到其他节点

/**     * Replicates all eureka actions to peer eureka nodes except for replication     * traffic to this node.     *     */    private void replicateToPeers(Action action, String appName, String id,                                  InstanceInfo info /* optional */,                                  InstanceStatus newStatus /* optional */, boolean isReplication) {        Stopwatch tracer = action.getTimer().start();        try {            if (isReplication) {                numberOfReplicationsLastMin.increment();            }            // If it is a replication already, do not replicate again as this will create a poison replication            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {                return;            }            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {                // 跳过自身                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {                    continue;                }                                //同步信息到其他节点,这里node里面保存很多其他节点的信息,主要用于和其他节点交互                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);            }        } finally {            tracer.stop();        }    }

这里主要看replicateInstanceActionsToPeers方法

/**     * Replicates all instance changes to peer eureka nodes except for     * replication traffic to this node.     *     */    private void replicateInstanceActionsToPeers(Action action, String appName,                                                 String id, InstanceInfo info, InstanceStatus newStatus,                                                 PeerEurekaNode node) {        try {            InstanceInfo infoFromRegistry = null;            CurrentRequestVersion.set(Version.V2);            switch (action) {                case Cancel:                    node.cancel(appName, id);                    break;                case Heartbeat:                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);                    break;                case Register:                    node.register(info);                    break;                case StatusUpdate:                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);                    break;                case DeleteStatusOverride:                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);                    node.deleteStatusOverride(appName, id, infoFromRegistry);                    break;            }        } catch (Throwable t) {            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);        }    }

看到这个方法,可以知道,注册中心相互节点之间其实是有很多数据同步操作的,现在这个是注册操作,我们直接看注册的代码。

/**     * Sends the registration information of {@link InstanceInfo} receiving by     * this node to the peer node represented by this class.     *     * @param info     *            the instance information {@link InstanceInfo} of any instance     *            that is send to this instance.     * @throws Exception     */    public void register(final InstanceInfo info) throws Exception {        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);        batchingDispatcher.process(                taskId("register", info),                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {                    public EurekaHttpResponse
execute() { //这是主要代码,这里是去发送一个http请求到对方节点 return replicationClient.register(info); } }, expiryTime //这里会有一个延迟,这是个任务,不会立即执行 ); }

给客户端获取信息的入口,这里用到了缓存,其实是把注册中心的信息保存在了缓存中,直接返回,避免频繁读取注册的map。

/**     * Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.     *     * 

* The delta changes represent the registry information change for a period * as configured by * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The * changes that can happen in a registry include * Registrations,Cancels,Status Changes and Expirations. Normally * the changes to the registry are infrequent and hence getting just the * delta will be much more efficient than getting the complete registry. *

* *

* Since the delta information is cached over a period of time, the requests * may return the same data multiple times within the window configured by * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients * are expected to handle this duplicate information. *

* * @param version the version of the request. * @param acceptHeader the accept header to indicate whether to serve JSON or XML data. * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data. * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept} * @param uriInfo the {@link java.net.URI} information of the request made. * @return response containing the delta information of the * {@link AbstractInstanceRegistry}. */ @Path("delta") @GET public Response getContainerDifferential( @PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); // If the delta flag is disabled in discovery or if the lease expiration // has been disabled, redirect clients to get all instances if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) { return Response.status(Status.FORBIDDEN).build(); } String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL_DELTA.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { return Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { return Response.ok(responseCache.get(cacheKey)) .build(); } }

有问题请留言。

转载地址:http://osjqi.baihongyu.com/

你可能感兴趣的文章
Professional VSTO 2005 : Visual Studio 2005 Tools for Office
查看>>
Building Flash Web Sites For Dummies
查看>>
Service-Oriented Software System Engineering Challenges and Practices
查看>>
Expert One-on-One: Microsoft Access Application Development
查看>>
Managing the IT Services Process
查看>>
Introduction to Cryptography with Java Applets
查看>>
Advanced Wireless Networks : 4G Technologies
查看>>
Professional Java User Interfaces
查看>>
The Database Hacker's Handbook: Defending Database Servers
查看>>
IT Administrator's Top 10 Introductory Scripts for Windows
查看>>
Algorithms and Data Structures: The Science of Computing
查看>>
ASP.NET 2.0 Cookbook
查看>>
Java I/O
查看>>
Visual C# 2005 Demystified
查看>>
Unlocking Microsoft C# V 2.0 Programming Secrets
查看>>
Programming Excel with VBA and .NET
查看>>
SQL Server 2005 T-SQL Recipes: A Problem-Solution Approach
查看>>
Core Python Programming
查看>>
Creating Database Web Applications with PHP and ASP
查看>>
ASP.NET 2.0 Demystified
查看>>