1.linux指令

查看主题的详细信息指令

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic 主题名称

修改主题的分区消息指令

./bin/kafka-topics.sh --alter  --zookeeper localhost:2181  --topic 主题名称  --partitions 分区数量

2.java代码

我是使用springboot整合的kafka,所以pom文件这里我springboot版本是2.0.6

<!--        kafka依赖包-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- kafka辅助工具类-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.2.0</version>
        </dependency>

KafkaInitialConfiguration配置类

package com.rbs.kafka.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName KafkaInitialConfiguration
 * @Author ywj
 * @Describe
 * @Date 2019/5/31 0031 9:53
 */
@Configuration
public class KafkaInitialConfiguration {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapservers;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = new HashMap<>();
        //配置Kafka实例的连接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapservers);
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }
}

KafkaConsole

package com.rbs.kafka.util;


import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;


/**
 * @ClassName ConsoleApi
 * @Author ywj
 * @Describe 主题操作控制类
 * @Date 2019/5/31 0031 9:32
 */
@Service
public class KafkaConsole {
    @Autowired
    private AdminClient adminClient;

    /**
     * 返回主题的信息
     * @param topicName 主题名称
     * @return
     */
    public KafkaFuture<Map<String, TopicDescription>> SelectTopicInfo(String topicName) {
        DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
        KafkaFuture<Map<String, TopicDescription>> all = result.all();
        return all;
    }


    /**
     * 增加某个主题的分区(注意分区只能增加不能减少)
     * @param topicName  主题名称
     * @param number  修改数量
     */
    public void edit(String topicName,Integer number){
        Map<String, NewPartitions> newPartitions=new HashMap<String, NewPartitions>();
        //创建新的分区的结果
        newPartitions.put(topicName,NewPartitions.increaseTo(number));
        adminClient.createPartitions(newPartitions);
    }






}

调用这个方法即可

最后介绍一下kafka这个管理工具AdminClient,官网上关于AdminClient的介绍是:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.,详情可参见官网文档

我们可以阅读一下这个类的源码

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
 *
 * The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker
 * version required.
 *
 * This client was introduced in 0.11.0.0 and the API is still evolving. We will try to evolve the API in a compatible
 * manner, but we reserve the right to make breaking changes in minor releases, if necessary. We will update the
 * {@code InterfaceStability} annotation and this notice once the API is considered stable.
 */
@InterfaceStability.Evolving
public abstract class AdminClient implements AutoCloseable {

    /**
     * Create a new AdminClient with the given configuration.
     *
     * @param props The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Properties props) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(props), null);
    }

    /**
     * Create a new AdminClient with the given configuration.
     *
     * @param conf The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Map<String, Object> conf) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(conf), null);
    }

    /**
     * Close the AdminClient and release all associated resources.
     *
     * See {@link AdminClient#close(long, TimeUnit)}
     */
    @Override
    public void close() {
        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    /**
     * Close the AdminClient and release all associated resources.
     *
     * The close operation has a grace period during which current operations will be allowed to
     * complete, specified by the given duration and time unit.
     * New operations will not be accepted during the grace period.  Once the grace period is over,
     * all operations that have not yet been completed will be aborted with a TimeoutException.
     *
     * @param duration  The duration to use for the wait time.
     * @param unit      The time unit to use for the wait time.
     */
    public abstract void close(long duration, TimeUnit unit);

    /**
     * Create a batch of new topics with the default options.
     *
     * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher.
     *
     * @param newTopics         The new topics to create.
     * @return                  The CreateTopicsResult.
     */
    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
        return createTopics(newTopics, new CreateTopicsOptions());
    }

    /**
     * Create a batch of new topics.
     *
     * This operation is not transactional so it may succeed for some topics while fail for others.
     *
     * It may take several seconds after {@code CreateTopicsResult} returns
     * success for all the brokers to become aware that the topics have been created.
     * During this time, {@link AdminClient#listTopics()} and {@link AdminClient#describeTopics(Collection)}
     * may not return information about the new topics.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
     * from version 0.10.2.0.
     *
     * @param newTopics         The new topics to create.
     * @param options           The options to use when creating the new topics.
     * @return                  The CreateTopicsResult.
     */
    public abstract CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
                                                    CreateTopicsOptions options);

    /**
     * This is a convenience method for #{@link AdminClient#deleteTopics(Collection, DeleteTopicsOptions)}
     * with default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher.
     *
     * @param topics            The topic names to delete.
     * @return                  The DeleteTopicsResult.
     */
    public DeleteTopicsResult deleteTopics(Collection<String> topics) {
        return deleteTopics(topics, new DeleteTopicsOptions());
    }

    /**
     * Delete a batch of topics.
     *
     * This operation is not transactional so it may succeed for some topics while fail for others.
     *
     * It may take several seconds after the {@code DeleteTopicsResult} returns
     * success for all the brokers to become aware that the topics are gone.
     * During this time, AdminClient#listTopics and AdminClient#describeTopics
     * may continue to return information about the deleted topics.
     *
     * If delete.topic.enable is false on the brokers, deleteTopics will mark
     * the topics for deletion, but not actually delete them.  The futures will
     * return successfully in this case.
     *
     * This operation is supported by brokers with version 0.10.1.0 or higher.
     *
     * @param topics            The topic names to delete.
     * @param options           The options to use when deleting the topics.
     * @return                  The DeleteTopicsResult.
     */
    public abstract DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);

    /**
     * List the topics available in the cluster with the default options.
     *
     * This is a convenience method for #{@link AdminClient#listTopics(ListTopicsOptions)} with default options.
     * See the overload for more details.
     *
     * @return                  The ListTopicsResult.
     */
    public ListTopicsResult listTopics() {
        return listTopics(new ListTopicsOptions());
    }

    /**
     * List the topics available in the cluster.
     *
     * @param options           The options to use when listing the topics.
     * @return                  The ListTopicsResult.
     */
    public abstract ListTopicsResult listTopics(ListTopicsOptions options);

    /**
     * Describe some topics in the cluster, with the default options.
     *
     * This is a convenience method for #{@link AdminClient#describeTopics(Collection, DescribeTopicsOptions)} with
     * default options. See the overload for more details.
     *
     * @param topicNames        The names of the topics to describe.
     *
     * @return                  The DescribeTopicsResult.
     */
    public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
        return describeTopics(topicNames, new DescribeTopicsOptions());
    }

    /**
     * Describe some topics in the cluster.
     *
     * @param topicNames        The names of the topics to describe.
     * @param options           The options to use when describing the topic.
     *
     * @return                  The DescribeTopicsResult.
     */
    public abstract DescribeTopicsResult describeTopics(Collection<String> topicNames,
                                                         DescribeTopicsOptions options);

    /**
     * Get information about the nodes in the cluster, using the default options.
     *
     * This is a convenience method for #{@link AdminClient#describeCluster(DescribeClusterOptions)} with default options.
     * See the overload for more details.
     *
     * @return                  The DescribeClusterResult.
     */
    public DescribeClusterResult describeCluster() {
        return describeCluster(new DescribeClusterOptions());
    }

    /**
     * Get information about the nodes in the cluster.
     *
     * @param options           The options to use when getting information about the cluster.
     * @return                  The DescribeClusterResult.
     */
    public abstract DescribeClusterResult describeCluster(DescribeClusterOptions options);

    /**
     * This is a convenience method for #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions)} with
     * default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filter            The filter to use.
     * @return                  The DeleteAclsResult.
     */
    public DescribeAclsResult describeAcls(AclBindingFilter filter) {
        return describeAcls(filter, new DescribeAclsOptions());
    }

    /**
     * Lists access control lists (ACLs) according to the supplied filter.
     *
     * Note: it may take some time for changes made by createAcls or deleteAcls to be reflected
     * in the output of describeAcls.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filter            The filter to use.
     * @param options           The options to use when listing the ACLs.
     * @return                  The DeleteAclsResult.
     */
    public abstract DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);

    /**
     * This is a convenience method for #{@link AdminClient#createAcls(Collection, CreateAclsOptions)} with
     * default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param acls              The ACLs to create
     * @return                  The CreateAclsResult.
     */
    public CreateAclsResult createAcls(Collection<AclBinding> acls) {
        return createAcls(acls, new CreateAclsOptions());
    }

    /**
     * Creates access control lists (ACLs) which are bound to specific resources.
     *
     * This operation is not transactional so it may succeed for some ACLs while fail for others.
     *
     * If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
     * no changes will be made.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param acls              The ACLs to create
     * @param options           The options to use when creating the ACLs.
     * @return                  The CreateAclsResult.
     */
    public abstract CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);

    /**
     * This is a convenience method for #{@link AdminClient#deleteAcls(Collection, DeleteAclsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filters           The filters to use.
     * @return                  The DeleteAclsResult.
     */
    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
        return deleteAcls(filters, new DeleteAclsOptions());
    }

    /**
     * Deletes access control lists (ACLs) according to the supplied filters.
     *
     * This operation is not transactional so it may succeed for some ACLs while fail for others.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param filters           The filters to use.
     * @param options           The options to use when deleting the ACLs.
     * @return                  The DeleteAclsResult.
     */
    public abstract DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);


    /**
     * Get the configuration for the specified resources with the default options.
     *
     * This is a convenience method for #{@link AdminClient#describeConfigs(Collection, DescribeConfigsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param resources         The resources (topic and broker resource types are currently supported)
     * @return                  The DescribeConfigsResult
     */
    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
        return describeConfigs(resources, new DescribeConfigsOptions());
    }

    /**
     * Get the configuration for the specified resources.
     *
     * The returned configuration includes default values and the isDefault() method can be used to distinguish them
     * from user supplied values.
     *
     * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
     * is not disclosed.
     *
     * Config entries where isReadOnly() is true cannot be updated.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param resources         The resources (topic and broker resource types are currently supported)
     * @param options           The options to use when describing configs
     * @return                  The DescribeConfigsResult
     */
    public abstract DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources,
                                                           DescribeConfigsOptions options);

    /**
     * Update the configuration for the specified resources with the default options.
     *
     * This is a convenience method for #{@link AdminClient#alterConfigs(Map, AlterConfigsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param configs         The resources with their configs (topic is the only resource type with configs that can
     *                        be updated currently)
     * @return                The AlterConfigsResult
     */
    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
        return alterConfigs(configs, new AlterConfigsOptions());
    }

    /**
     * Update the configuration for the specified resources with the default options.
     *
     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
     * a particular resource are updated atomically.
     *
     * This operation is supported by brokers with version 0.11.0.0 or higher.
     *
     * @param configs         The resources with their configs (topic is the only resource type with configs that can
     *                        be updated currently)
     * @param options         The options to use when describing configs
     * @return                The AlterConfigsResult
     */
    public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);

    /**
     * Change the log directory for the specified replicas. This API is currently only useful if it is used
     * before the replica has been created on the broker. It will support moving replicas that have already been created after
     * KIP-113 is fully implemented.
     *
     * This is a convenience method for #{@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param replicaAssignment  The replicas with their log directory absolute path
     * @return                   The AlterReplicaLogDirsResult
     */
    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
        return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
    }

    /**
     * Change the log directory for the specified replicas. This API is currently only useful if it is used
     * before the replica has been created on the broker. It will support moving replicas that have already been created after
     * KIP-113 is fully implemented.
     *
     * This operation is not transactional so it may succeed for some replicas while fail for others.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param replicaAssignment  The replicas with their log directory absolute path
     * @param options            The options to use when changing replica dir
     * @return                   The AlterReplicaLogDirsResult
     */
    public abstract AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options);

    /**
     * Query the information of all log directories on the given set of brokers
     *
     * This is a convenience method for #{@link AdminClient#describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
     * See the overload for more details.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param brokers     A list of brokers
     * @return            The DescribeLogDirsResult
     */
    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
        return describeLogDirs(brokers, new DescribeLogDirsOptions());
    }

    /**
     * Query the information of all log directories on the given set of brokers
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param brokers     A list of brokers
     * @param options     The options to use when querying log dir info
     * @return            The DescribeLogDirsResult
     */
    public abstract DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);

    /**
     * Query the replica log directory information for the specified replicas.
     *
     * This is a convenience method for #{@link AdminClient#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
     * with default options. See the overload for more details.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param replicas      The replicas to query
     * @return              The DescribeReplicaLogDirsResult
     */
    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
        return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
    }

    /**
     * Query the replica log directory information for the specified replicas.
     *
     * This operation is supported by brokers with version 1.0.0 or higher.
     *
     * @param replicas      The replicas to query
     * @param options       The options to use when querying replica log dir info
     * @return              The DescribeReplicaLogDirsResult
     */
    public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);

    /**
     * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
     * the partition logic or ordering of the messages will be affected.</strong></p>
     *
     * <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
     * See the overload for more details.</p>
     *
     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
     *                      for the created partitions.
     * @return              The CreatePartitionsResult.
     */
    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
        return createPartitions(newPartitions, new CreatePartitionsOptions());
    }

    /**
     * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
     * the partition logic or ordering of the messages will be affected.</strong></p>
     *
     * <p>This operation is not transactional so it may succeed for some topics while fail for others.</p>
     *
     * <p>It may take several seconds after this method returns
     * success for all the brokers to become aware that the partitions have been created.
     * During this time, {@link AdminClient#describeTopics(Collection)}
     * may not return information about the new partitions.</p>
     *
     * <p>This operation is supported by brokers with version 1.0.0 or higher.</p>
     *
     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
     * {@link CreatePartitionsResult#values() values()} method of the returned {@code CreatePartitionsResult}</p>
     * <ul>
     *     <li>{@link org.apache.kafka.common.errors.AuthorizationException}
     *     if the authenticated user is not authorized to alter the topic</li>
     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *     if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
     *     <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
     *     if a partition reassignment is currently in progress</li>
     *     <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
     *     if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
     *     <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
     *     if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
     *     replicas with the topics replication factor.</li>
     *     <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
     *     if the request is invalid in some way.</li>
     * </ul>
     *
     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
     *                      for the created partitions.
     * @param options       The options to use when creating the new paritions.
     * @return              The CreatePartitionsResult.
     */
    public abstract CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
                                                            CreatePartitionsOptions options);

}

 

可以得出结论主要功能包括:

  1. 创建Topic:createTopics(Collection<NewTopic> newTopics)
  2. 删除Topic:deleteTopics(Collection<String> topics)
  3. 显示所有Topic:listTopics()
  4. 查询Topic:describeTopics(Collection<String> topicNames)
  5. 查询集群信息:describeCluster()
  6. 查询ACL信息:describeAcls(AclBindingFilter filter)
  7. 创建ACL信息:createAcls(Collection<AclBinding> acls)
  8. 删除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
  9. 查询配置信息:describeConfigs(Collection<ConfigResource> resources)
  10. 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
  11. 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  12. 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers)
  13. 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  14. 增加分区:createPartitions(Map<String, NewPartitions> newPartitions)

具体使用可以看源码上的注释

 

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐