久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

solr源碼分析之solrclound

 昵稱10504424 2015-09-02

一.簡介

SolrCloud是Solr4.0版本以后基于Solr和Zookeeper的分布式搜索方案。SolrCloud是Solr的基于Zookeeper一種部署方式。Solr可以以多種方式部署,,例如單機(jī)方式,,多機(jī)Master-Slaver方式。

二.特色功能

SolrCloud有幾個(gè)特色功能:

集中式的配置信息使用ZK進(jìn)行集中配置,。啟動(dòng)時(shí)可以指定把Solr的相關(guān)配置文件上傳Zookeeper,,多機(jī)器共用。這些ZK中的配置不會(huì)再拿到本地緩存,,Solr直接讀取ZK中的配置信息,。配置文件的變動(dòng),所有機(jī)器都可以感知到,。另外,,Solr的一些任務(wù)也是通過ZK作為媒介發(fā)布的。目的是為了容錯(cuò),。接收到任務(wù),,但在執(zhí)行任務(wù)時(shí)崩潰的機(jī)器,在重啟后,,或者集群選出候選者時(shí),,可以再次執(zhí)行這個(gè)未完成的任務(wù)。

自動(dòng)容錯(cuò)SolrCloud對(duì)索引分片,,并對(duì)每個(gè)分片創(chuàng)建多個(gè)Replication,。每個(gè)Replication都可以對(duì)外提供服務(wù)。一個(gè)Replication掛掉不會(huì)影響索引服務(wù),。更強(qiáng)大的是,,它還能自動(dòng)的在其它機(jī)器上幫你把失敗機(jī)器上的索引Replication重建并投入使用。

近實(shí)時(shí)搜索立即推送式的replication(也支持慢推送),??梢栽诿雰?nèi)檢索到新加入索引。

查詢時(shí)自動(dòng)負(fù)載均衡SolrCloud索引的多個(gè)Replication可以分布在多臺(tái)機(jī)器上,,均衡查詢壓力,。如果查詢壓力大,可以通過擴(kuò)展機(jī)器,,增加Replication來減緩,。

自動(dòng)分發(fā)的索引和索引分片發(fā)送文檔到任何節(jié)點(diǎn),它都會(huì)轉(zhuǎn)發(fā)到正確節(jié)點(diǎn),。

事務(wù)日志事務(wù)日志確保更新無丟失,,即使文檔沒有索引到磁盤。

其它值得一提的功能有:

索引存儲(chǔ)在HDFS上索引的大小通常在G和幾十G,,上百G的很少,,這樣的功能或許很難實(shí)用,。但是,如果你有上億數(shù)據(jù)來建索引的話,,也是可以考慮一下的,。我覺得這個(gè)功能最大的好處或許就是和下面這個(gè)“通過MR批量創(chuàng)建索引”聯(lián)合實(shí)用,。

通過MR批量創(chuàng)建索引有了這個(gè)功能,,你還擔(dān)心創(chuàng)建索引慢嗎?

強(qiáng)大的RESTful API通常你能想到的管理功能,,都可以通過此API方式調(diào)用,。這樣寫一些維護(hù)和管理腳本就方便多了。

優(yōu)秀的管理界面主要信息一目了然,;可以清晰的以圖形化方式看到SolrCloud的部署分布,;當(dāng)然還有不可或缺的Debug功能。

三.概念

Collection:在SolrCloud集群中邏輯意義上的完整的索引,。它常常被劃分為一個(gè)或多個(gè)Shard,,它們使用相同的Config Set。如果Shard數(shù)超過一個(gè),,它就是分布式索引,,SolrCloud讓你通過Collection名稱引用它,而不需要關(guān)心分布式檢索時(shí)需要使用的和Shard相關(guān)參數(shù),。

Config Set: Solr Core提供服務(wù)必須的一組配置文件,。每個(gè)config set有一個(gè)名字。最小需要包括solrconfig.xml (SolrConfigXml)和schema.xml (SchemaXml),,除此之外,,依據(jù)這兩個(gè)文件的配置內(nèi)容,可能還需要包含其它文件,。它存儲(chǔ)在Zookeeper中,。Config sets可以重新上傳或者使用upconfig命令更新,使用Solr的啟動(dòng)參數(shù)bootstrap_confdir指定可以初始化或更新它,。

Core: 也就是Solr Core,,一個(gè)Solr中包含一個(gè)或者多個(gè)Solr Core,每個(gè)Solr Core可以獨(dú)立提供索引和查詢功能,,每個(gè)Solr Core對(duì)應(yīng)一個(gè)索引或者Collection的Shard,,Solr Core的提出是為了增加管理靈活性和共用資源。在SolrCloud中有個(gè)不同點(diǎn)是它使用的配置是在Zookeeper中的,,傳統(tǒng)的Solr core的配置文件是在磁盤上的配置目錄中,。

Leader: 贏得選舉的Shard replicas。每個(gè)Shard有多個(gè)Replicas,,這幾個(gè)Replicas需要選舉來確定一個(gè)Leader,。選舉可以發(fā)生在任何時(shí)間,但是通常他們僅在某個(gè)Solr實(shí)例發(fā)生故障時(shí)才會(huì)觸發(fā)。當(dāng)索引documents時(shí),,SolrCloud會(huì)傳遞它們到此Shard對(duì)應(yīng)的leader,,leader再分發(fā)它們到全部Shard的replicas。

Replica: Shard的一個(gè)拷貝,。每個(gè)Replica存在于Solr的一個(gè)Core中,。一個(gè)命名為“test”的collection以numShards=1創(chuàng)建,并且指定replicationFactor設(shè)置為2,,這會(huì)產(chǎn)生2個(gè)replicas,,也就是對(duì)應(yīng)會(huì)有2個(gè)Core,每個(gè)在不同的機(jī)器或者Solr實(shí)例,。一個(gè)會(huì)被命名為test_shard1_replica1,,另一個(gè)命名為test_shard1_replica2。它們中的一個(gè)會(huì)被選舉為Leader,。

Shard: Collection的邏輯分片,。每個(gè)Shard被化成一個(gè)或者多個(gè)replicas,通過選舉確定哪個(gè)是Leader,。

Zookeeper: Zookeeper提供分布式鎖功能,,對(duì)SolrCloud是必須的。它處理Leader選舉,。Solr可以以內(nèi)嵌的Zookeeper運(yùn)行,,但是建議用獨(dú)立的,并且最好有3個(gè)以上的主機(jī),。

四.架構(gòu)圖

索引(collection)的邏輯圖 
index

Solr和索引對(duì)照?qǐng)D 
solr-index

創(chuàng)建索引過程 
create-index

分布式查詢 
search

Shard Splitting 
shard-splitting

五.其它

NRT  近實(shí)時(shí)搜索Solr的建索引數(shù)據(jù)是要在提交時(shí)寫入磁盤的,,這是硬提交,確保即便是停電也不會(huì)丟失數(shù)據(jù),;為了提供更實(shí)時(shí)的檢索能力,,Solr設(shè)定了一種軟提交方式。軟提交(soft commit):僅把數(shù)據(jù)提交到內(nèi)存,,index可見,,此時(shí)沒有寫入到磁盤索引文件中。

一個(gè)通常的用法是:每1-10分鐘自動(dòng)觸發(fā)硬提交,,每秒鐘自動(dòng)觸發(fā)軟提交,。

RealTime Get 實(shí)時(shí)獲取允許通過唯一鍵查找任何文檔的最新版本數(shù)據(jù),并且不需要重新打開searcher,。這個(gè)主要用于把Solr作為NoSQL數(shù)據(jù)存儲(chǔ)服務(wù),,而不僅僅是搜索引擎。Realtime Get當(dāng)前依賴事務(wù)日志,,默認(rèn)是開啟的,。另外,,即便是Soft Commit或者commitwithin,get也能得到真實(shí)數(shù)據(jù),。 注:commitwithin是一種數(shù)據(jù)提交特性,,不是立刻,而是要求在一定時(shí)間內(nèi)提交數(shù)據(jù).

 

源碼分析開始

一.SolrDispatchFilter初始化

復(fù)制代碼
@Override
  public void init(FilterConfig config) throws ServletException
  {
    log.info("SolrDispatchFilter.init(): {}", this.getClass().getClassLoader());

    String exclude = config.getInitParameter("excludePatterns");
    if(exclude != null) {
      String[] excludeArray = exclude.split(",");
      excludePatterns = new ArrayList<>();
      for (String element : excludeArray) {
        excludePatterns.add(Pattern.compile(element));
      }
    }
    try {
      Properties extraProperties = (Properties) config.getServletContext().getAttribute(PROPERTIES_ATTRIBUTE);
      if (extraProperties == null)
        extraProperties = new Properties();

      String solrHome = (String) config.getServletContext().getAttribute(SOLRHOME_ATTRIBUTE);
      if (solrHome == null)
        solrHome = SolrResourceLoader.locateSolrHome();
      ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());

      this.cores = createCoreContainer(solrHome, extraProperties);

      if (this.cores.getAuthenticationPlugin() != null) {
        HttpClientConfigurer configurer = this.cores.getAuthenticationPlugin().getDefaultConfigurer();
        if (configurer != null) {
          configurer.configure((DefaultHttpClient) httpClient, new ModifiableSolrParams());
        }
      }

      log.info("user.dir=" + System.getProperty("user.dir"));
    }
    catch( Throwable t ) {
      // catch this so our filter still works
      log.error( "Could not start Solr. Check solr/home property and the logs");
      SolrCore.log( t );
      if (t instanceof Error) {
        throw (Error) t;
      }
    }

    log.info("SolrDispatchFilter.init() done");
  }
復(fù)制代碼

二.CoreContainer執(zhí)行l(wèi)oad方法

復(fù)制代碼
 //-------------------------------------------------------------------
  // Initialization / Cleanup
  //-------------------------------------------------------------------

  /**
   * Load the cores defined for this CoreContainer
   */
  public void load()  {
    log.info("Loading cores into CoreContainer [instanceDir={}]", loader.getInstanceDir());

    // add the sharedLib to the shared resource loader before initializing cfg based plugins
    String libDir = cfg.getSharedLibDirectory();
    if (libDir != null) {
      File f = FileUtils.resolvePath(new File(solrHome), libDir);
      log.info("loading shared library: " + f.getAbsolutePath());
      loader.addToClassLoader(libDir, null, false);
      loader.reloadLuceneSPI();
    }


    shardHandlerFactory = ShardHandlerFactory.newInstance(cfg.getShardHandlerFactoryPluginInfo(), loader);

    updateShardHandler = new UpdateShardHandler(cfg.getUpdateShardHandlerConfig());

    solrCores.allocateLazyCores(cfg.getTransientCacheSize(), loader);

    logging = LogWatcher.newRegisteredLogWatcher(cfg.getLogWatcherConfig(), loader);

    hostName = cfg.getNodeName();

    zkSys.initZooKeeper(this, solrHome, cfg.getCloudConfig());

    initializeAuthenticationPlugin();

    if (isZooKeeperAware()) {
      intializeAuthorizationPlugin();
    }

    collectionsHandler = createHandler(cfg.getCollectionsHandlerClass(), CollectionsHandler.class);
    containerHandlers.put(COLLECTIONS_HANDLER_PATH, collectionsHandler);
    infoHandler        = createHandler(cfg.getInfoHandlerClass(), InfoHandler.class);
    containerHandlers.put(INFO_HANDLER_PATH, infoHandler);
    coreAdminHandler   = createHandler(cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
    containerHandlers.put(CORES_HANDLER_PATH, coreAdminHandler);

    coreConfigService = ConfigSetService.createConfigSetService(cfg, loader, zkSys.zkController);

    containerProperties.putAll(cfg.getSolrProperties());

    // setup executor to load cores in parallel
    // do not limit the size of the executor in zk mode since cores may try and wait for each other.
    final ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(
        ( zkSys.getZkController() == null ? cfg.getCoreLoadThreadCount() : Integer.MAX_VALUE ),
        new DefaultSolrThreadFactory("coreLoadExecutor") );
    final List<Future<SolrCore>> futures = new ArrayList<Future<SolrCore>>();
    try {

      List<CoreDescriptor> cds = coresLocator.discover(this);
      checkForDuplicateCoreNames(cds);


      for (final CoreDescriptor cd : cds) {
        if (cd.isTransient() || !cd.isLoadOnStartup()) {
          solrCores.putDynamicDescriptor(cd.getName(), cd);
        } else if (asyncSolrCoreLoad) {
          solrCores.markCoreAsLoading(cd);
        }
        if (cd.isLoadOnStartup()) {
          futures.add(coreLoadExecutor.submit(new Callable<SolrCore>() {
            @Override
            public SolrCore call() throws Exception {
              SolrCore core;
              try {
                if (zkSys.getZkController() != null) {
                  zkSys.getZkController().throwErrorIfReplicaReplaced(cd);
                }
                
                core = create(cd, false);
              } finally {
                if (asyncSolrCoreLoad) {
                  solrCores.markCoreAsNotLoading(cd);
                }
              }
              try {
                zkSys.registerInZk(core, true);
              } catch (Throwable t) {
                SolrException.log(log, "Error registering SolrCore", t);
              }
              return core;
            }
          }));
        }
      }


      // Start the background thread
      backgroundCloser = new CloserThread(this, solrCores, cfg);
      backgroundCloser.start();

    } finally {
      if (asyncSolrCoreLoad && futures != null) {
        Thread shutdownThread = new Thread() {
          public void run() {
            try {
              for (Future<SolrCore> future : futures) {
                try {
                  future.get();
                } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
                } catch (ExecutionException e) {
                  log.error("Error waiting for SolrCore to be created", e);
                }
              }
            } finally {
              ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
            }
          }
        };
        coreContainerWorkExecutor.submit(shutdownThread);
      } else {
        ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
      }
    }
    
    if (isZooKeeperAware()) {
      zkSys.getZkController().checkOverseerDesignate();
    }
  }
復(fù)制代碼

三.ZkContainer調(diào)用配置文件,,初始化zookeeper

復(fù)制代碼
public void initZooKeeper(final CoreContainer cc, String solrHome, CloudConfig config) {

    ZkController zkController = null;

    String zkRun = System.getProperty("zkRun");

    if (zkRun != null && config == null)
      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Cannot start Solr in cloud mode - no cloud config provided");
    
    if (config == null)
        return;  // not in zk mode

    String zookeeperHost = config.getZkHost();

    // zookeeper in quorum mode currently causes a failure when trying to
    // register log4j mbeans.  See SOLR-2369
    // TODO: remove after updating to an slf4j based zookeeper
    System.setProperty("zookeeper.jmx.log4j.disable", "true");

    if (zkRun != null) {
      String zkDataHome = System.getProperty("zkServerDataDir", solrHome + "zoo_data");
      String zkConfHome = System.getProperty("zkServerConfDir", solrHome);
      zkServer = new SolrZkServer(stripChroot(zkRun), stripChroot(config.getZkHost()), zkDataHome, zkConfHome, config.getSolrHostPort());
      zkServer.parseConfig();
      zkServer.start();
      
      // set client from server config if not already set
      if (zookeeperHost == null) {
        zookeeperHost = zkServer.getClientString();
      }
    }

    int zkClientConnectTimeout = 30000;

    if (zookeeperHost != null) {

      // we are ZooKeeper enabled
      try {
        // If this is an ensemble, allow for a long connect time for other servers to come up
        if (zkRun != null && zkServer.getServers().size() > 1) {
          zkClientConnectTimeout = 24 * 60 * 60 * 1000;  // 1 day for embedded ensemble
          log.info("Zookeeper client=" + zookeeperHost + "  Waiting for a quorum.");
        } else {
          log.info("Zookeeper client=" + zookeeperHost);          
        }
        String confDir = System.getProperty("bootstrap_confdir");
        boolean boostrapConf = Boolean.getBoolean("bootstrap_conf");  
        
        if(!ZkController.checkChrootPath(zookeeperHost, (confDir!=null) || boostrapConf || zkRunOnly)) {
          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
              "A chroot was specified in ZkHost but the znode doesn't exist. " + zookeeperHost);
        }
        zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config,
            new CurrentCoreDescriptorProvider() {

              @Override
              public List<CoreDescriptor> getCurrentDescriptors() {
                List<CoreDescriptor> descriptors = new ArrayList<>(
                    cc.getCoreNames().size());
                Collection<SolrCore> cores = cc.getCores();
                for (SolrCore core : cores) {
                  descriptors.add(core.getCoreDescriptor());
                }
                return descriptors;
              }
            });


        if (zkRun != null && zkServer.getServers().size() > 1 && confDir == null && boostrapConf == false) {
          // we are part of an ensemble and we are not uploading the config - pause to give the config time
          // to get up
          Thread.sleep(10000);
        }
        
        if(confDir != null) {
          Path configPath = Paths.get(confDir);
          if (!Files.isDirectory(configPath))
            throw new IllegalArgumentException("bootstrap_confdir must be a directory of configuration files");

          String confName = System.getProperty(ZkController.COLLECTION_PARAM_PREFIX+ZkController.CONFIGNAME_PROP, "configuration1");
          ZkConfigManager configManager = new ZkConfigManager(zkController.getZkClient());
          configManager.uploadConfigDir(configPath, confName);
        }


        
        if(boostrapConf) {
          ZkController.bootstrapConf(zkController.getZkClient(), cc, solrHome);
        }
        
      } catch (InterruptedException e) {
        // Restore the interrupted status
        Thread.currentThread().interrupt();
        log.error("", e);
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
            "", e);
      } catch (TimeoutException e) {
        log.error("Could not connect to ZooKeeper", e);
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
            "", e);
      } catch (IOException | KeeperException e) {
        log.error("", e);
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
            "", e);
      }


    }
    this.zkController = zkController;
  }
復(fù)制代碼

四.調(diào)用zkController的初始化方法進(jìn)行選舉

復(fù)制代碼
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {

    try {
      boolean createdWatchesAndUpdated = false;
      Stat stat = zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, null, true);
      if (stat!= null && stat.getNumChildren()>0) {
        zkStateReader.createClusterStateWatchersAndUpdate();
        createdWatchesAndUpdated = true;
        publishAndWaitForDownStates();
      }
      
      createClusterZkNodes(zkClient);

      createEphemeralLiveNode();

      ShardHandler shardHandler;
      UpdateShardHandler updateShardHandler;
      shardHandler = cc.getShardHandlerFactory().getShardHandler();
      updateShardHandler = cc.getUpdateShardHandler();
      
      if (!zkRunOnly) {
        overseerElector = new LeaderElector(zkClient);
        this.overseer = new Overseer(shardHandler, updateShardHandler,
            CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
        ElectionContext context = new OverseerElectionContext(zkClient,
            overseer, getNodeName());
        overseerElector.setup(context);
        overseerElector.joinElection(context, false);
      }
      
      if (!createdWatchesAndUpdated) {
        zkStateReader.createClusterStateWatchersAndUpdate();
      }
      
    } catch (IOException e) {
      log.error("", e);
      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
          "Can't create ZooKeeperController", e);
    } catch (InterruptedException e) {
      // Restore the interrupted status
      Thread.currentThread().interrupt();
      log.error("", e);
      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
          "", e);
    } catch (KeeperException e) {
      log.error("", e);
      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
          "", e);
    }

  }
復(fù)制代碼

五.具體實(shí)現(xiàn)為LeaderElector的joinElection()方法

復(fù)制代碼
/**
     * Begin participating in the election process. Gets a new sequential number
     * and begins watching the node with the sequence number before it, unless it
     * is the lowest number, in which case, initiates the leader process. If the
     * node that is watched goes down, check if we are the new lowest node, else
     * watch the next lowest numbered node.
     *
     * @return sequential node number
     */
  public int joinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
    context.joinedElectionFired();
    
    final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
    
    long sessionId = zkClient.getSolrZooKeeper().getSessionId();
    String id = sessionId + "-" + context.id;
    String leaderSeqPath = null;
    boolean cont = true;
    int tries = 0;
    while (cont) {
      try {
        if(joinAtHead){
          log.info("Node {} trying to join election at the head", id);
          List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
          if(nodes.size() <2){
            leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
                CreateMode.EPHEMERAL_SEQUENTIAL, false);
          } else {
            String firstInLine = nodes.get(1);
            log.info("The current head: {}", firstInLine);
            Matcher m = LEADER_SEQ.matcher(firstInLine);
            if (!m.matches()) {
              throw new IllegalStateException("Could not find regex match in:"
                  + firstInLine);
            }
            leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1);
            zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false);
          }
        } else {
          leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
              CreateMode.EPHEMERAL_SEQUENTIAL, false);
        }

        log.info("Joined leadership election with path: {}", leaderSeqPath);
        context.leaderSeqPath = leaderSeqPath;
        cont = false;
      } catch (ConnectionLossException e) {
        // we don't know if we made our node or not...
        List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);
        
        boolean foundId = false;
        for (String entry : entries) {
          String nodeId = getNodeId(entry);
          if (id.equals(nodeId)) {
            // we did create our node...
            foundId  = true;
            break;
          }
        }
        if (!foundId) {
          cont = true;
          if (tries++ > 20) {
            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
                "", e);
          }
          try {
            Thread.sleep(50);
          } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
          }
        }

      } catch (KeeperException.NoNodeException e) {
        // we must have failed in creating the election node - someone else must
        // be working on it, lets try again
        if (tries++ > 20) {
          context = null;
          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
              "", e);
        }
        cont = true;
        try {
          Thread.sleep(50);
        } catch (InterruptedException e2) {
          Thread.currentThread().interrupt();
        }
      }
    }
    checkIfIamLeader(context, replacement);

    return getSeq(context.leaderSeqPath);
  }
復(fù)制代碼

六.OverseerCollectionProcessor

實(shí)現(xiàn)了Runnable接口,,故其核心方法是run()方法:

復(fù)制代碼
 @Override
  public void run() {
    log.info("Process current queue of collection creations");
    LeaderStatus isLeader = amILeader();
    while (isLeader == LeaderStatus.DONT_KNOW) {
      log.debug("am_i_leader unclear {}", isLeader);
      isLeader = amILeader();  // not a no, not a yes, try ask again
    }

    String oldestItemInWorkQueue = null;
    // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer.
    // This variable is set in case there's any task found on the workQueue when the OCP starts up and
    // the id for the queue tail is used as a marker to check for the task in completed/failed map in zk.
    // Beyond the marker, all tasks can safely be assumed to have never been executed.
    boolean hasLeftOverItems = true;

    try {
      oldestItemInWorkQueue = workQueue.getTailId();
    } catch (KeeperException e) {
      // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
      // async calls.
      SolrException.log(log, "", e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    if (oldestItemInWorkQueue == null)
      hasLeftOverItems = false;
    else
      log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);

    try {
      prioritizeOverseerNodes();
    } catch (Exception e) {
      log.error("Unable to prioritize overseer ", e);
    }

    // TODO: Make maxThreads configurable.

    this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
        new SynchronousQueue<Runnable>(),
        new DefaultSolrThreadFactory("OverseerThreadFactory"));
    try {
      while (!this.isClosed) {
        try {
          isLeader = amILeader();
          if (LeaderStatus.NO == isLeader) {
            break;
          } else if (LeaderStatus.YES != isLeader) {
            log.debug("am_i_leader unclear {}", isLeader);
            continue; // not a no, not a yes, try asking again
          }

          log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
          cleanUpWorkQueue();

          printTrackingMaps();

          boolean waited = false;

          while (runningTasks.size() > maxParallelThreads) {
            synchronized (waitLock) {
              waitLock.wait(100);//wait for 100 ms or till a task is complete
            }
            waited = true;
          }

          if (waited)
            cleanUpWorkQueue();

          List<QueueEvent> heads = workQueue.peekTopN(maxParallelThreads, runningZKTasks, 2000L);

          if (heads == null)
            continue;

          log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());

          if (isClosed) break;

          for (QueueEvent head : heads) {
            final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
            String collectionName = message.containsKey(COLLECTION_PROP) ?
                message.getStr(COLLECTION_PROP) : message.getStr(NAME);
            final String asyncId = message.getStr(ASYNC);
            if (hasLeftOverItems) {
              if (head.getId().equals(oldestItemInWorkQueue))
                hasLeftOverItems = false;
              if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
                log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId );
                workQueue.remove(head);
                continue;
              }
            }

            if (!checkExclusivity(message, head.getId())) {
              log.debug("Exclusivity check failed for [{}]", message.toString());
              continue;
            }

            try {
              markTaskAsRunning(head, collectionName, asyncId, message);
              log.debug("Marked task [{}] as running", head.getId());
            } catch (KeeperException.NodeExistsException e) {
              // This should never happen
              log.error("Tried to pick up task [{}] when it was already running!", head.getId());
            } catch (InterruptedException e) {
              log.error("Thread interrupted while trying to pick task for execution.", head.getId());
              Thread.currentThread().interrupt();
            }

            log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
            String operation = message.getStr(Overseer.QUEUE_OPERATION);
            Runner runner = new Runner(message,
                operation, head);
            tpe.execute(runner);
          }

        } catch (KeeperException e) {
          if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
            log.warn("Overseer cannot talk to ZK");
            return;
          }
          SolrException.log(log, "", e);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          return;
        } catch (Exception e) {
          SolrException.log(log, "", e);
        }
      }
    } finally {
      this.close();
    }
  }
復(fù)制代碼

該run()方法由調(diào)用了一個(gè)內(nèi)部類Runner,見紅線所示,,Runner也是一個(gè)線程,,實(shí)現(xiàn)了Runnable接口,,其核心方法同樣為run():

復(fù)制代碼
 @Override
    public void run() {

      final TimerContext timerContext = stats.time("collection_" + operation);

      boolean success = false;
      final String asyncId = message.getStr(ASYNC);
      String collectionName = message.containsKey(COLLECTION_PROP) ?
          message.getStr(COLLECTION_PROP) : message.getStr(NAME);

      try {
        try {
          log.debug("Runner processing {}", head.getId());
          response = processMessage(message, operation);
        } finally {
          timerContext.stop();
          updateStats();
        }

        if(asyncId != null) {
          if (response != null && (response.getResponse().get("failure") != null 
              || response.getResponse().get("exception") != null)) {
            failureMap.put(asyncId, SolrResponse.serializable(response));
            log.debug("Updated failed map for task with zkid:[{}]", head.getId());
          } else {
            completedMap.put(asyncId, SolrResponse.serializable(response));
            log.debug("Updated completed map for task with zkid:[{}]", head.getId());
          }
        } else {
          head.setBytes(SolrResponse.serializable(response));
          log.debug("Completed task:[{}]", head.getId());
        }

        markTaskComplete(head.getId(), asyncId, collectionName);
        log.debug("Marked task [{}] as completed.", head.getId());
        printTrackingMaps();

        log.info("Overseer Collection Processor: Message id:" + head.getId() +
            " complete, response:" + response.getResponse().toString());
        success = true;
      } catch (KeeperException e) {
        SolrException.log(log, "", e);
      } catch (InterruptedException e) {
        // Reset task from tracking data structures so that it can be retried.
        resetTaskWithException(head.getId(), asyncId, collectionName);
        log.warn("Resetting task {} as the thread was interrupted.", head.getId());
        Thread.currentThread().interrupt();
      } finally {
        if(!success) {
          // Reset task from tracking data structures so that it can be retried.
          resetTaskWithException(head.getId(), asyncId, collectionName);
        }
        synchronized (waitLock){
          waitLock.notifyAll();
        }
      }
    }
復(fù)制代碼

上述方法中,,使用紅線標(biāo)明了核心實(shí)現(xiàn)方法processMessage(),該方法具體實(shí)現(xiàn)了Collection的各種操作:

復(fù)制代碼
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
    log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());

    NamedList results = new NamedList();
    try {
      // force update the cluster state
      zkStateReader.updateClusterState();
      CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
      if (action == null) {
        throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
      }
      switch (action) {
        case CREATE:
          createCollection(zkStateReader.getClusterState(), message, results);
          break;
        case DELETE:
          deleteCollection(message, results);
          break;
        case RELOAD:
          ModifiableSolrParams params = new ModifiableSolrParams();
          params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
          collectionCmd(zkStateReader.getClusterState(), message, params, results, Replica.State.ACTIVE);
          break;
        case CREATEALIAS:
          createAlias(zkStateReader.getAliases(), message);
          break;
        case DELETEALIAS:
          deleteAlias(zkStateReader.getAliases(), message);
          break;
        case SPLITSHARD:
          splitShard(zkStateReader.getClusterState(), message, results);
          break;
        case DELETESHARD:
          deleteShard(zkStateReader.getClusterState(), message, results);
          break;
        case CREATESHARD:
          createShard(zkStateReader.getClusterState(), message, results);
          break;
        case DELETEREPLICA:
          deleteReplica(zkStateReader.getClusterState(), message, results);
          break;
        case MIGRATE:
          migrate(zkStateReader.getClusterState(), message, results);
          break;
        case ADDROLE:
          processRoleCommand(message, operation);
          break;
        case REMOVEROLE:
          processRoleCommand(message, operation);
          break;
        case ADDREPLICA:
          addReplica(zkStateReader.getClusterState(), message, results);
          break;
        case OVERSEERSTATUS:
          getOverseerStatus(message, results);
          break;
        case CLUSTERSTATUS://TODO . deprecated. OCP does not need to do it .remove in a later release
          new ClusterStatus(zkStateReader, message).getClusterStatus(results);
          break;
        case ADDREPLICAPROP:
          processReplicaAddPropertyCommand(message);
          break;
        case DELETEREPLICAPROP:
          processReplicaDeletePropertyCommand(message);
          break;
        case BALANCESHARDUNIQUE:
          balanceProperty(message);
          break;
        case REBALANCELEADERS:
          processRebalanceLeaders(message);
          break;
        case MODIFYCOLLECTION:
          overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
          break;
        default:
          throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
              + operation);
      }
    } catch (Exception e) {
      String collName = message.getStr("collection");
      if (collName == null) collName = message.getStr(NAME);

      if (collName == null) {
        SolrException.log(log, "Operation " + operation + " failed", e);
      } else  {
        SolrException.log(log, "Collection: " + collName + " operation: " + operation
            + " failed", e);
      }

      results.add("Operation " + operation + " caused exception:", e);
      SimpleOrderedMap nl = new SimpleOrderedMap();
      nl.add("msg", e.getMessage());
      nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1);
      results.add("exception", nl);
    }
    return new OverseerSolrResponse(results);
  }
復(fù)制代碼

我們以SPLITSHARD為例說明:

復(fù)制代碼
private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
    String collectionName = message.getStr("collection");
    String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
    
    log.info("Split shard invoked");
    String splitKey = message.getStr("split.key");
    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
    
    DocCollection collection = clusterState.getCollection(collectionName);
    DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
    
    Slice parentSlice = null;
    
    if (slice == null) {
      if (router instanceof CompositeIdRouter) {
        Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
        if (searchSlices.isEmpty()) {
          throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
        }
        if (searchSlices.size() > 1) {
          throw new SolrException(ErrorCode.BAD_REQUEST,
              "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
        }
        parentSlice = searchSlices.iterator().next();
        slice = parentSlice.getName();
        log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
      } else {
        throw new SolrException(ErrorCode.BAD_REQUEST,
            "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
                + router.getClass().getName());
      }
    } else {
      parentSlice = clusterState.getSlice(collectionName, slice);
    }
    
    if (parentSlice == null) {
      if (clusterState.hasCollection(collectionName)) {
        throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
      } else {
        throw new SolrException(ErrorCode.BAD_REQUEST,
            "No collection with the specified name exists: " + collectionName);
      }
    }
    
    // find the leader for the shard
    Replica parentShardLeader = null;
    try {
      parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    
    DocRouter.Range range = parentSlice.getRange();
    if (range == null) {
      range = new PlainIdRouter().fullRange();
    }
    
    List<DocRouter.Range> subRanges = null;
    String rangesStr = message.getStr(CoreAdminParams.RANGES);
    if (rangesStr != null) {
      String[] ranges = rangesStr.split(",");
      if (ranges.length == 0 || ranges.length == 1) {
        throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
      } else {
        subRanges = new ArrayList<>(ranges.length);
        for (int i = 0; i < ranges.length; i++) {
          String r = ranges[i];
          try {
            subRanges.add(DocRouter.DEFAULT.fromString(r));
          } catch (Exception e) {
            throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
          }
          if (!subRanges.get(i).isSubsetOf(range)) {
            throw new SolrException(ErrorCode.BAD_REQUEST,
                "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
          }
        }
        List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
        Collections.sort(temp);
        if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
          throw new SolrException(ErrorCode.BAD_REQUEST,
              "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
        }
        for (int i = 1; i < temp.size(); i++) {
          if (temp.get(i - 1).max + 1 != temp.get(i).min) {
            throw new SolrException(ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
                + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
          }
        }
      }
    } else if (splitKey != null) {
      if (router instanceof CompositeIdRouter) {
        CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
        subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
        if (subRanges.size() == 1) {
          throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
              + " has a hash range that is exactly equal to hash range of shard: " + slice);
        }
        for (DocRouter.Range subRange : subRanges) {
          if (subRange.min == subRange.max) {
            throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
          }
        }
        log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
        rangesStr = "";
        for (int i = 0; i < subRanges.size(); i++) {
          DocRouter.Range subRange = subRanges.get(i);
          rangesStr += subRange.toString();
          if (i < subRanges.size() - 1) rangesStr += ',';
        }
      }
    } else {
      // todo: fixed to two partitions?
      subRanges = router.partitionRange(2, range);
    }
    
    try {
      List<String> subSlices = new ArrayList<>(subRanges.size());
      List<String> subShardNames = new ArrayList<>(subRanges.size());
      String nodeName = parentShardLeader.getNodeName();
      for (int i = 0; i < subRanges.size(); i++) {
        String subSlice = slice + "_" + i;
        subSlices.add(subSlice);
        String subShardName = collectionName + "_" + subSlice + "_replica1";
        subShardNames.add(subShardName);
        
        Slice oSlice = clusterState.getSlice(collectionName, subSlice);
        if (oSlice != null) {
          final Slice.State state = oSlice.getState();
          if (state == Slice.State.ACTIVE) {
            throw new SolrException(ErrorCode.BAD_REQUEST,
                "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
          } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
            // delete the shards
            for (String sub : subSlices) {
              log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
              Map<String,Object> propMap = new HashMap<>();
              propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
              propMap.put(COLLECTION_PROP, collectionName);
              propMap.put(SHARD_ID_PROP, sub);
              ZkNodeProps m = new ZkNodeProps(propMap);
              try {
                deleteShard(clusterState, m, new NamedList());
              } catch (Exception e) {
                throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub,
                    e);
              }
            }
          }
        }
      }
      
      // do not abort splitshard if the unloading fails
      // this can happen because the replicas created previously may be down
      // the only side effect of this is that the sub shard may end up having more replicas than we want
      collectShardResponses(results, false, null, shardHandler);
      
      final String asyncId = message.getStr(ASYNC);
      HashMap<String,String> requestMap = new HashMap<>();
      
      for (int i = 0; i < subRanges.size(); i++) {
        String subSlice = subSlices.get(i);
        String subShardName = subShardNames.get(i);
        DocRouter.Range subRange = subRanges.get(i);
        
        log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
        
        Map<String,Object> propMap = new HashMap<>();
        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
        propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
        propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
        propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
        propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
        DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
        inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
        
        // wait until we are able to see the new shard in cluster state
        waitForNewShard(collectionName, subSlice);
        
        // refresh cluster state
        clusterState = zkStateReader.getClusterState();
        
        log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
            + " on " + nodeName);
        propMap = new HashMap<>();
        propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
        propMap.put(COLLECTION_PROP, collectionName);
        propMap.put(SHARD_ID_PROP, subSlice);
        propMap.put("node", nodeName);
        propMap.put(CoreAdminParams.NAME, subShardName);
        // copy over property params:
        for (String key : message.keySet()) {
          if (key.startsWith(COLL_PROP_PREFIX)) {
            propMap.put(key, message.getStr(key));
          }
        }
        // add async param
        if (asyncId != null) {
          propMap.put(ASYNC, asyncId);
        }
        addReplica(clusterState, new ZkNodeProps(propMap), results);
      }
      
      collectShardResponses(results, true, "SPLITSHARD failed to create subshard leaders", shardHandler);
      
      completeAsyncRequest(asyncId, requestMap, results);
      
      for (String subShardName : subShardNames) {
        // wait for parent leader to acknowledge the sub-shard core
        log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
        String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName);
        CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
        cmd.setCoreName(subShardName);
        cmd.setNodeName(nodeName);
        cmd.setCoreNodeName(coreNodeName);
        cmd.setState(Replica.State.ACTIVE);
        cmd.setCheckLive(true);
        cmd.setOnlyIfLeader(true);
        
        ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
        sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
      }
      
      collectShardResponses(results, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
          shardHandler);
          
      completeAsyncRequest(asyncId, requestMap, results);
      
      log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
          + " on: " + parentShardLeader);
          
      log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
          + collectionName + " on " + parentShardLeader);
          
      ModifiableSolrParams params = new ModifiableSolrParams();
      params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
      params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
      for (int i = 0; i < subShardNames.size(); i++) {
        String subShardName = subShardNames.get(i);
        params.add(CoreAdminParams.TARGET_CORE, subShardName);
      }
      params.set(CoreAdminParams.RANGES, rangesStr);
      
      sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
      
      collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", shardHandler);
      completeAsyncRequest(asyncId, requestMap, results);
      
      log.info("Index on shard: " + nodeName + " split into two successfully");
      
      // apply buffered updates on sub-shards
      for (int i = 0; i < subShardNames.size(); i++) {
        String subShardName = subShardNames.get(i);
        
        log.info("Applying buffered updates on : " + subShardName);
        
        params = new ModifiableSolrParams();
        params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
        params.set(CoreAdminParams.NAME, subShardName);
        
        sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
      }
      
      collectShardResponses(results, true, "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
          shardHandler);
          
      completeAsyncRequest(asyncId, requestMap, results);
      
      log.info("Successfully applied buffered updates on : " + subShardNames);
      
      // Replica creation for the new Slices
      
      // look at the replication factor and see if it matches reality
      // if it does not, find best nodes to create more cores
      
      // TODO: Have replication factor decided in some other way instead of numShards for the parent
      
      int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size();
      
      // we need to look at every node and see how many cores it serves
      // add our new cores to existing nodes serving the least number of cores
      // but (for now) require that each core goes on a distinct node.
      
      // TODO: add smarter options that look at the current number of cores per
      // node?
      // for now we just go random
      Set<String> nodes = clusterState.getLiveNodes();
      List<String> nodeList = new ArrayList<>(nodes.size());
      nodeList.addAll(nodes);
      
      // TODO: Have maxShardsPerNode param for this operation?
      
      // Remove the node that hosts the parent shard for replica creation.
      nodeList.remove(nodeName);
      
      // TODO: change this to handle sharding a slice into > 2 sub-shards.

      List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
      for (int i = 1; i <= subSlices.size(); i++) {
        Collections.shuffle(nodeList, RANDOM);
        String sliceName = subSlices.get(i - 1);
        for (int j = 2; j <= repFactor; j++) {
          String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
          String shardName = collectionName + "_" + sliceName + "_replica" + (j);

          log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
              + collectionName + " on " + subShardNodeName);

          ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
              ZkStateReader.COLLECTION_PROP, collectionName,
              ZkStateReader.SHARD_ID_PROP, sliceName,
              ZkStateReader.CORE_NAME_PROP, shardName,
              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
              ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
              ZkStateReader.NODE_NAME_PROP, subShardNodeName);
          Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));

          HashMap<String,Object> propMap = new HashMap<>();
          propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
          propMap.put(COLLECTION_PROP, collectionName);
          propMap.put(SHARD_ID_PROP, sliceName);
          propMap.put("node", subShardNodeName);
          propMap.put(CoreAdminParams.NAME, shardName);
          // copy over property params:
          for (String key : message.keySet()) {
            if (key.startsWith(COLL_PROP_PREFIX)) {
              propMap.put(key, message.getStr(key));
            }
          }
          // add async param
          if (asyncId != null) {
            propMap.put(ASYNC, asyncId);
          }
          // special flag param to instruct addReplica not to create the replica in cluster state again
          propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");

          replicas.add(propMap);
        }
      }

      // we must set the slice state into recovery before actually creating the replica cores
      // this ensures that the logic inside Overseer to update sub-shard state to 'active'
      // always gets a chance to execute. See SOLR-7673

      if (repFactor == 1) {
        // switch sub shard states to 'active'
        log.info("Replication factor is 1 so switching shard states");
        DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
        Map<String,Object> propMap = new HashMap<>();
        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
        propMap.put(slice, Slice.State.INACTIVE.toString());
        for (String subSlice : subSlices) {
          propMap.put(subSlice, Slice.State.ACTIVE.toString());
        }
        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
        ZkNodeProps m = new ZkNodeProps(propMap);
        inQueue.offer(Utils.toJSON(m));
      } else {
        log.info("Requesting shard state be set to 'recovery'");
        DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
        Map<String,Object> propMap = new HashMap<>();
        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
        for (String subSlice : subSlices) {
          propMap.put(subSlice, Slice.State.RECOVERY.toString());
        }
        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
        ZkNodeProps m = new ZkNodeProps(propMap);
        inQueue.offer(Utils.toJSON(m));
      }

      // now actually create replica cores on sub shard nodes
      for (Map<String, Object> replica : replicas) {
        addReplica(clusterState, new ZkNodeProps(replica), results);
      }
      
      collectShardResponses(results, true,
          "SPLITSHARD failed to create subshard replicas", shardHandler);
          
      completeAsyncRequest(asyncId, requestMap, results);
      
      log.info("Successfully created all replica shards for all sub-slices " + subSlices);
      
      commit(results, slice, parentShardLeader);
      
      return true;
    } catch (SolrException e) {
      throw e;
    } catch (Exception e) {
      log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
      throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
    }
復(fù)制代碼

小結(jié):

solrCloud 從zookeeper開始一步步分析到具體的命令執(zhí)行,,完整了走遍了流程,,但因篇幅限制沒有就具體細(xì)節(jié)進(jìn)行講解。后續(xù)會(huì)在后面的文章中分析每個(gè)細(xì)節(jié),。

參考文獻(xiàn):

【1】http:///detail/48735-solrcloud

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多