久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

一站式Kafka平臺解決方案——KafkaCenter

 精品唯居 2022-03-24

KafkaCenter是什么

KafkaCenter是一個針對Kafka的一站式,,解決方案,。用于Kafka集群的維護(hù)與管理,生產(chǎn)者和消費(fèi)者的監(jiān)控,,以及Kafka部分生態(tài)組件的使用,。

對于Kafka的平臺化,一直缺少一個成熟的解決方案,,之前比較流行的kafka監(jiān)控方案,如kafka-manager提供了集群管理與topic管理等等功能,。但是對于生產(chǎn)者,、消費(fèi)者的監(jiān)控,以及Kafka的新生態(tài),,如Connect,,KSQL還缺少響應(yīng)的支持。Confluent Control Center功能要完整一些,,但卻是非開源收費(fèi)的,。

對于Kafka的使用,一直都是一個讓人頭疼的問題,,由于實(shí)時系統(tǒng)的強(qiáng)運(yùn)維特性,,我們不得不投入大量的時間用于集群的維護(hù),kafka的運(yùn)維,,比如:

  • 人工創(chuàng)建topic,,特別費(fèi)力
  • 相關(guān)kafka運(yùn)維,監(jiān)控孤島化
  • 現(xiàn)有消費(fèi)監(jiān)控工具監(jiān)控不準(zhǔn)確
  • 無法拿到Kafka 集群的summay信息
  • 無法快速知曉集群健康狀態(tài)
  • 無法知曉業(yè)務(wù)對team kafka使用情況
  • kafka管理,,監(jiān)控工具稀少,,沒有一個好的工具我們直接可以使用
  • 無法快速查詢topic消息

功能模塊介紹

  • Home-> 查看平臺管理的Kafka Cluster集群信息及監(jiān)控信息
  • Topic-> 用戶可以在此模塊查看自己的Topic,發(fā)起申請新建Topic,,同時可以對Topic進(jìn)行生產(chǎn)消費(fèi)測試,。
  • Monitor-> 用戶可以在此模塊中可以查看Topic的生產(chǎn)以及消費(fèi)情況,同時可以針對消費(fèi)延遲情況設(shè)置預(yù)警信息,。
  • Connect-> 實(shí)現(xiàn)用戶快速創(chuàng)建自己的Connect Job,,并對自己的Connect進(jìn)行維護(hù)。
  • KSQL-> 實(shí)現(xiàn)用戶快速創(chuàng)建自己的KSQL Job,,并對自己的Job進(jìn)行維護(hù),。
  • Approve-> 此模塊主要用于當(dāng)普通用戶申請創(chuàng)建Topic,管理員進(jìn)行審批操作,。
  • Setting-> 此模塊主要功能為管理員維護(hù)User,、Team以及kafka cluster信息
  • Kafka Manager-> 此模塊用于管理員對集群的正常維護(hù)操作,。

系統(tǒng)截圖:

安裝與入門

安裝需要依賴 mysql es email server

組件 是否必須 功能
mysql 必須 配置信息存在mysql
elasticsearch(7.0+) 可選 各種監(jiān)控信息的存儲
email server 可選 Apply, approval, warning e-mail alert

1,、初始化

在MySQL中執(zhí)行sql建表

-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
USE `kafka_center`;


-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `threshold` int(11) DEFAULT NULL,
  `dispause` int(11) DEFAULT NULL,
  `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_date` datetime DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `disable_alerta` tinyint(1) DEFAULT 0,
  `enable` tinyint(1) NOT NULL DEFAULT 1,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL,
  `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `enable` int(11) DEFAULT NULL,
  `broker_size` int(4) DEFAULT 0,
  `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',
  `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) DEFAULT NULL,
  `cluster_name` varchar(255) DEFAULT NULL,
  `ksql_url` varchar(255) DEFAULT NULL,
  `ksql_serverId` varchar(255) DEFAULT NULL,
  `version` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `message_rate` int(50) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  `approved` int(11) DEFAULT NULL,
  `approved_id` int(11) DEFAULT NULL,
  `approved_time` datetime DEFAULT NULL,
  `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `user_id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `ttl` bigint(11) DEFAULT NULL,
  `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `real_name` varchar(255) COLLATE utf8_bin DEFAULT '',
  `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',
  `create_time` datetime DEFAULT NULL,
  `password` varchar(255) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

2、配置

相關(guān)配置位于application.properties

可對端口 日志等信息做一些修改

server.port=8080
debug=false
# 設(shè)置session timeout為6小時
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false

public.url=http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16

monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否啟用收集線程指定集群收集
monitor.collector.include.enable=false
#收集線程指定location,,必須屬于remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是為了提高lag查詢和收集,,解決跨location網(wǎng)絡(luò)延遲問題
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#發(fā)送consumer group的lag發(fā)送給alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否開啟郵件功能,true:啟用,false:禁用
mail.enable=false
spring.mail.host=
[email protected]
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=

3,、運(yùn)行

推薦使用docker

docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0

不用docker

$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenter\KafkaCenter-Core\target
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar

4,、查看

訪問http://localhost:8080 管理員用戶與密碼默認(rèn):admin / admin

功能介紹

Topics

用戶可以在此模塊完成Topic查看,,已經(jīng)申請新建Topic,同時可以對Topic進(jìn)行生產(chǎn)消費(fèi)測試,。

Monitor

用戶可以在此模塊中可以查看Topic的生成以及消費(fèi)情況,,同時可以針對消費(fèi)延遲情況設(shè)置預(yù)警信息。

Alerts

此模塊用于維護(hù)預(yù)警信息,。用戶可以看到自己所有預(yù)警信息,,管理員可以看到所有人的預(yù)警信息。

Kafka Connect

實(shí)現(xiàn)用戶快速創(chuàng)建自己的Connect Job,,并對自己的Connect進(jìn)行維護(hù),。

KSQL

實(shí)現(xiàn)用戶快速創(chuàng)建自己的KSQL Job,并對自己的Job進(jìn)行維護(hù),。

Approve

此模塊主要用于當(dāng)普通用戶申請創(chuàng)建Topic 或者Job時,,管理員進(jìn)行審批操作。

Setting

此模塊主要功能為管理員維護(hù)User,、Team以及kafka cluster信息

Cluster Manager

此模塊用于管理員對集群的正常維護(hù)操作,。

Home

這里是一些基本的統(tǒng)計信息

My Favorite

集群與topic列表

Topic

這里是一些topic的管理功能

Topic List

操作范圍:

用戶所屬Team的所有Topic

  • Topic -> Topic List -> Detail 查看Topic的詳細(xì)信息
  • Topic -> Topic List -> Mock 對Topic進(jìn)行生產(chǎn)測試

申請創(chuàng)建topic

Important: admin不能申請task,普通用戶必須先讓管理員新建team后,將用戶加入指定team后,,才可以申請task,。

操作范圍:

用戶所屬Team的所有Task

  • Topic -> My Task -> Detail 查看申請的Task信息

  • Topic -> My Task -> Delete 刪除被拒絕或待審批的Task

  • Topic -> My Task -> Edit 修改被拒絕的Task

  • Topic -> My Task -> Create Topic Task 創(chuàng)建Task

    • 按照表單各字段要求填寫信息
    • 點(diǎn)擊確認(rèn),提交申請

    審批結(jié)果:

    • 審批通過:Topic將會被創(chuàng)建在管理員指定的集群
    • 審批拒絕:用戶收到郵件,,返回到My Task,,點(diǎn)擊對應(yīng)Task后面的Edit,針對審批意見進(jìn)行修改

Topic命名規(guī)則:

只能包含:數(shù)字,、大小寫字母,、下劃線、中劃線,、點(diǎn),;長度大于等于3小于等于100,。

不推薦:下劃線開頭;

可對所有Topic進(jìn)行消費(fèi)測試

Monitor

監(jiān)控模塊

生產(chǎn)者監(jiān)控

消費(fèi)者監(jiān)控

消息積壓

報警功能

Connect

這里是一些Connect的操作

KSQL

可以進(jìn)行KQL的查詢操作

Approve

這里主要是管理員做一些審核操作

  • Approve->check 審批用戶的Task
  • 根據(jù)用戶選擇的location指定cluster
  • 檢查用戶設(shè)置的partition和replication大小是否合理,,如不合理做出調(diào)整
  • 檢查其他字段是否合理,,如需要拒絕該申請,點(diǎn)擊Reject并填寫意見,。

Kafka Manager
Topic管理

Cluster管理

broker管理

group管理

Setting

這些主要是用戶的一些設(shè)置

KafkaCenter還是一個非常不錯的kafka管理工具,,可以滿足大部分需求。
更多實(shí)時數(shù)據(jù)分析相關(guān)博文與科技資訊,,歡迎關(guān)注 “實(shí)時流式計算”

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn),。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報,。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約