dev/Cloud & Infra

Kafka 보안 (2) - SASL/PLAIN

lugi 2019. 8. 22. 10:20

이전의 포스트에서 JAVA가 사용하는 JAAS 및 프로토콜-보안메커니즘을 분리하는 SASL 에 대해서 살펴보았다.

kafka는 SASL의 몇 가지 메커니즘을 활용하여 주키퍼-브로커, 브로커-클라이언트 간의 인증을 적용할 수 있는데, 이번 포스트에서는 그 방법 중 가장 간단한 방법이라고 생각되는 SASL/PLAIN 에 대해서 소개하려 한다.

 

SASL/PLAIN은 username/password 기반으로 인증을 수행하는 방식이며 안전한 인증을 위해서는 TLS 와 병행할 것을 권장한다. 이번 포스트에서 소개할 예제는 브로커-클라이언트 간의 SASL/PLAIN이며 TLS 없이 SASL/PLAIN 을 구현한다.

 

일단 이번 포스트에서 설명할 것의 순서는 아래와 같다

1. broker (kafka 서버) 설정

2. client 설정

3. client의 CallbackHandler 구현

 

1. Broker 설정

Broker(서버측) 설정부터 하겠다.

일단, kafka 에 보안을 구성할 때 설정해야 할 사항은 기본적으로 2가지가 있다.

첫째는 JDK 의 JAAS config를 통해 인증 모듈을 활성화시키는 것이며, 두 번째는 kafka에 보안 설정을 적용하는 것이다.

먼저 JAAS 파일 설정을 살펴보자.

 

[kafka_plain_jaas.conf] - 이용자가 신규로 브로커가 위치한 서버에 작성해야 함

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

위의 파일을 각 라인별로 살펴보자면

첫 번째 라인은 KafkaServer 라는 service 에 관련된 설정을 { ~~ } 안에 하겠다는 의미이다. (KafkaServer, KafkaClient, Server, Client 가 Kafka 에서 사용하도록 지정된 service name 이다)

두 번째 라인은 활성화 시킬 로그인 모듈이 PlainLoginModule 이라는 의미이다.

세 번째, 네 번째 라인의 의미는, 이 파일을 사용하여 구동하는 애플리케이션은 본인의 정보로 위에 표시된 username 및 password를 사용한다는 것이다.

다섯 번째, 여섯 번째 라인의 의미는 서버가 가지고 있는 아이디/비밀번호의 리스트이다. 

Kafka는 user_[ID]="[비밀번호]" 의 형식으로 아이디/비밀번호를 보관한다.

그러므로 위의 코드에서 admin 사용자의 비밀번호는 admin-secret, alice 사용자의 비밀번호는 alice-secret이다.

 

[server.properties] - kafka 디렉토리의 /config 에 있음

broker.id=1
listeners=SASL_PLAINTEXT://192.168.0.201:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
num.partitions=3
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3.2181/gnu-kafka
log.dirs=/data/kafka
log.retention.hours=48
allow.auto.create.topics=false
delete.topic.enable=true

이번에 예제를 작성하기 위해 사용했던 server.properties 파일이다.

이 중 보안에 영향을 미치는 부분은 listeners, security.inter.broker.protocol, sasl.mechanism.inter.broker.protocol, sasl.enabled.mechanisms 이다.

- listeners : 일반적으로 PLAINTEXT://IP:PORT 형식을 사용하는데 SASL 기반의 인증을 사용하기 위해 SASL_PLAINTEXT: 로 적용하였다. 만약 TLS가 활성화되어 있다면 SASL_SSL: 을 사용하여야 한다.

- security.inter.broker.protocol : broker 간의 통신에 사용할 protocol을 의미한다. SASL 을 사용하지 않을 경우 inter.broker.listener.name과 역할이 겹치므로 동시에 사용할 수 없다. inter.broker.listener.name과 차이점이 있다면 inter.broker.listener.name은 별도로 지정한 protocol.map 에서 정의된 사용자 지정 프로토콜 접두사를 사용할 수 있지만, security.inter.broker.protocol은 유효값이 PLAINTEXT, SSL, SASL_PLATINTEXT, SASL_SSL 로 제한된다.

- sasl.mechanism.inter.broker.protocol은 브로커 간의 통신에 사용할 암호화 알고리즘을 의미한다. GSSAPI, PLAIN, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512 의 값이 올 수 있다.

- sasl.enabled.mechanisms : 활성화 시킬 암호화 알고리즘이다. 지정하지 않을시 GSSAPI(Kerberos)가 기본적으로 활성화 된다.

단일 리스너 환경에서 브로커 설정은 위와 같다. NAT 기반의 환경에서는 일반적으로 내부/외부에 별도의 다중 리스너를 구성하기 때문에 설정에 약간 차이가 있다. advertised.listeners 와 함께 사용하는 inter.broker.listener.name 을 보안 구성시에는 사용하지 못 하기 때문에 차이가 나는데, 그 부분은 다른 포스트에서 다루는 걸로 해야겠다.

 

위와 같은 구성을 한 후

export KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/kafka_plain_jaas.conf
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

로 JVM 파라미터를 준 후 브로커를 구동하면 보안이 적용된 브로커가 구동된다

 

2. Client 설정

일반적으로 클라이언트는 Producer/Consumer 를 주로 사용하는데 Kafka 에는 AdminClient라는 Client 도 있다. 이것을 사용하여 브로커에 접속해겠다.

일단 브로커를 설정할 때도 JAAS 설정 및 SASL 구성을 해줘야 하는 것은 같다.

JAAS 파일부터 살펴보자

 

[kafka_plain_client_jaas.conf] - 이용자가 신규로 작성해야 함

KafkaClient { 
	org.apache.kafka.common.security.plain.PlainLoginModule required 	
	username="alice"	
	password="alice-secret";
};

1번째 라인에 service name 으로 사용하는 부분이 KafkaClient 이다. 이 부분은 Producer/Consumer/AdminClient 의 JAAS에 적용된다.

2번째 라인의 PlainLoginModule은 브로커 설정과 같다.

3,4번째 라인의 username, password 를 사용하여 broker 에 접속할 것이다. 이 때 이 username, password는 브로커에서 user_[username]="[password]" 의 형식으로 설정되어 있는 값과 동일해야 한다.

 

console에서 위의 JAAS 구성을 사용하여 브로커에서 인증을 받으면 브로커 구동시 사용했던 export KAFKA_OPTS=-Djava.security.auth.login.config= 의 경로에 위의 kafka_plain_client_jass.conf 의 경로를 넣어주고 구동하는 것만으로 충분하다. 이 포스트에서는 JAVA 코드로 Broker에 접속하는 코드를 짜보도록 하겠다.

 

[AdminTutorial.java]

public class AdminTutorial {
    private static final Logger LOG = LoggerFactory.getLogger(AdminTutorial.class);
    public static final String SERVERS = "192.168.0.201:9092,192.168.0.202:9092,192.168.0.203:9092";

    public static AdminClient getAdminClient(String servers) {
        Properties props = new Properties();
        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.setProperty("sasl.mechanism", "PLAIN");
        props.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
        return KafkaAdminClient.create(props);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        try (AdminClient client = getAdminClient(SERVERS)) {
            Node node = client.describeCluster().controller().get();
            LOG.info("Controller is {}:{}", node.host(), node.port());
            String topic = "gnu-test-topic";
            Map<String, TopicDescription> stringTopicDescriptionMap = client.describeTopics(Arrays.asList(topic)).all().get();
            TopicDescription topicDescription = stringTopicDescriptionMap.get(topic);
            LOG.info("topic {} - partition {}", topicDescription.name(),topicDescription.partitions());
        }
    }
}

kafka 브로커는 192.168.0.201~203의 9092포트에 구성되어 있으며

애플리케이션 구동시 위의 구성 파일을 Djava.security.auth.login.config=kafka_plain_client_jaas.conf 로 적용해서 구동시키는 코드이다.

BOOTSTRAP_SERVERS_CONFIG와 SERCURITY_PROTOCOL_CONFIG 는 AdminClientConfig에 미리 정의된 상수를 이용해 적용해주었다. 다만 특이한 점은, sasl.mechanism을 별도로 지정해주지 않으면 GSSAPI(Kerberos)가 기본값으로 정의되어 있기 대문에 해당 부분의 설정이 없으면 에러가 나는데, 이 때 저 부분을 설정해주기 위한 상수가 kafka-clients 2.x 대 버전 기준으로 정의가 되어 있지 않기 때문에 위와 같이 직접 텍스트를 입력해주어야 한다.

 

위의 코드는 AdminClient를 이용해 컨트롤러 노드를 출력하고, gnu-test-topic 이라는 토픽이 사전에 구성되어 있다고 전제하고 해당 토픽의 정보를 출력하는 역할을 수행한다.

 

실행한 결과

...
09:47:25.228 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_HANDSHAKE_REQUEST
09:47:25.228 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
09:47:25.230 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to INITIAL
09:47:25.230 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to INTERMEDIATE
09:47:25.236 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to COMPLETE
...
09:47:20.714 [main] INFO com.gnu.kafka.admin.AdminTutorial - Controller is 192.168.0.203:9092
...
09:47:25.244 [main] INFO com.gnu.kafka.admin.AdminTutorial - topic gnu-test-topic - partition [(partition=0, leader=192.168.0.203:9092 (id: 3 rack: null), replicas=192.168.0.201:9092 (id: 1 rack: null), 192.168.0.203:9092 (id: 3 rack: null), isr=192.168.0.203:9092 (id: 3 rack: null), 192.168.0.201:9092 (id: 1 rack: null)), (partition=1, leader=192.168.0.203:9092 (id: 3 rack: null), replicas=192.168.0.203:9092 (id: 3 rack: null), 192.168.0.202:9092 (id: 2 rack: null), isr=192.168.0.203:9092 (id: 3 rack: null), 192.168.0.202:9092 (id: 2 rack: null)), (partition=2, leader=192.168.0.201:9092 (id: 1 rack: null), replicas=192.168.0.202:9092 (id: 2 rack: null), 192.168.0.201:9092 (id: 1 rack: null), isr=192.168.0.201:9092 (id: 1 rack: null), 192.168.0.202:9092 (id: 2 rack: null))]

무언가 인증 과정도 수행되었고, 출력하려고 한 정보도 잘 출력되었다.

다만 클라이언트에서 사용할 username, password를 평문으로 파일에 저장해야 한다는 점이 찝찝하다.

이 점을 개선하기 위해 JAAS의 CallbackHandler 를 구현할 것이다.

CallbackHandler는 JAAS의 인증 과정에서 사용자와의 상호 작용을 위해 사용한다.

 

3. Client의 AuthenticateCallbackHandler 구현

 

Kafka는 CallbackHandler를 상속한 AuthenticateCallbackHandler라는 인터페이스가 있다. 이것을 구현해보자

 

[PlainCallbackHandler.java]

package com.gnu.kafka.handler;

import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Scanner;

public class PlainCallbackHandler implements AuthenticateCallbackHandler {

    private static final Logger LOG = LoggerFactory.getLogger(PlainCallbackHandler.class);
    private String username = "";
    private String secret = "";

    @Override
    public void configure(Map<String, ?> map, String s, List<AppConfigurationEntry> list) {
        LOG.info("configure");
        LOG.info("Kafka Configure : {}", map); // kafka의 설정이 담긴 map
        LOG.info("Mechanism : {}", s); // 사용하는 인증 mechanism
        LOG.info("JAAS Configure : {}", list.get(0).getOptions()); // JAAS 설정이 담긴 list
    }

    @Override
    public void close() {

    }

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        try(Scanner sc = new Scanner(System.in)){ // 키보드로 입력을 받기 위해 Scanner 사용
            for(Callback callback : callbacks){
                if(callback instanceof NameCallback){ // username 입력
                    LOG.info(((NameCallback) callback).getName());
                    // 입력을 요청하는 Prompt를 띄움
                    System.out.println(((NameCallback) callback).getPrompt());
                    // 응답을 만들고 요청을 하는 과정에서 handle이 2회 실행되기 때문에
                    // 이미 username 을 입력했다면 pass 하도록 하기 위해 아래와 같이 구현
                    username = username.equals("") ? sc.nextLine() : username;
                    ((NameCallback) callback).setName(username);
                } else if(callback instanceof PasswordCallback){ // password 입력
                    System.out.println(((PasswordCallback) callback).getPrompt());
                    secret = secret.equals("") ? sc.nextLine() : secret;
                    ((PasswordCallback) callback).setPassword(secret.toCharArray());
                }
            }
        }
    }
}

 

AuthenticateCallbackHandler 를 구현한 PlainCallbackHandler의 소스코드이다.

Callback 핸들러 최초 구동시 configure 메소드가 호출되며, 이 때 파라미터에는 map, string, list가 전달되는데 첫 번째 인자는 kafka의 설정값이 담긴 map, 두 번째 인자는 사용하는 메커니즘이 담긴 string (이 예제에서는 PLAIN), 세번째는 JAAS의 구성이 담긴 List가 전달된다. (jaas 의 conf는 한 파일에 여러개의 service를 담을 수 있다)

 

이후 handle 메소드를 호출하여 인증과정을 처리하는데 이 때의 인자는 인증과정의 상호작용을 담당하는 callback이 전달된다. SASL/PLAIN 인증과정에서는 username을 처리하는 NameCallback, password를 처리하는 PasswordCallback이 각각 전달된다.

 

위의 예제에서는 NameCallback 과 PasswordCallback을 판별하여 Scanner를 통해 콘솔에서 키보드로 해당 값들을 입력하도록 하였다.

 

이 때 작성한 CallbackHandler를 사용하기 위해 Client 설정에 약간의 변경이 필요하다

    public static AdminClient getAdminClient(String servers) {
        Properties props = new Properties();
        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.setProperty("sasl.mechanism", "PLAIN");
        props.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
        props.setProperty("sasl.client.callback.handler.class", "com.gnu.kafka.handler.PlainCallbackHandler");
        return KafkaAdminClient.create(props);
    }

props.setProperty("sasl.client.callback.handler.class", "com.gnu.kafka.handler.PlainCallbackHandler")부분이 추가되었다.

sasl.client.callback.handler.class 의 속성에 CallbackHandler 클래스의 전체 경로를 주어, 인증 과정에서 해당 핸들러가 동작하도록 하였다.

 

이렇게 하여 실행한 결과는

...
10:06:41.482 [main] INFO com.gnu.kafka.handler.PlainCallbackHandler - configure
10:06:41.482 [main] INFO com.gnu.kafka.handler.PlainCallbackHandler - Kafka Configure : {metric.reporters=[], metadata.max.age.ms=300000, reconnect.backoff.max.ms=1000, sasl.login.refresh.window.factor=0.8, sasl.login.refresh.min.period.seconds=60, reconnect.backoff.ms=50, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[192.168.0.201:9092, 192.168.0.202:9092, 192.168.0.203:9092], metrics.recording.level=INFO, retry.backoff.ms=100, ssl.secure.random.implementation=null, sasl.kerberos.kinit.cmd=/usr/bin/kinit, sasl.kerberos.service.name=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.keystore.type=JKS, ssl.trustmanager.algorithm=PKIX, sasl.mechanism=PLAIN, ssl.key.password=null, sasl.jaas.config=null, sasl.client.callback.handler.class=class com.gnu.kafka.handler.PlainCallbackHandler, sasl.kerberos.min.time.before.relogin=60000, connections.max.idle.ms=300000, ssl.truststore.password=null, metrics.num.samples=2, client.id=, ssl.endpoint.identification.algorithm=https, ssl.protocol=TLS, request.timeout.ms=120000, sasl.login.refresh.buffer.seconds=300, ssl.provider=null, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], ssl.keystore.location=null, receive.buffer.bytes=65536, sasl.login.class=null, ssl.cipher.suites=null, ssl.truststore.type=JKS, security.protocol=SASL_PLAINTEXT, retries=5, ssl.truststore.location=null, ssl.keystore.password=null, ssl.keymanager.algorithm=SunX509, sasl.login.callback.handler.class=null, metrics.sample.window.ms=30000, send.buffer.bytes=131072, sasl.login.refresh.window.jitter=0.05}
10:06:41.483 [main] INFO com.gnu.kafka.handler.PlainCallbackHandler - Mechanism : PLAIN
10:06:41.484 [main] INFO com.gnu.kafka.handler.PlainCallbackHandler - JAAS Configure : {}
...
PLAIN authentication id: 
alice # 이 부분은 직접 키보드로 입력한 부분이다.
PLAIN password: 
alice-secret # 이 부분도 직접 키보드로 입력한 부분이다.
...
10:06:59.227 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Completed connection to node 1. Fetching API versions.
10:06:59.228 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_HANDSHAKE_REQUEST
10:06:59.228 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
10:06:59.229 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to INITIAL
10:06:59.229 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to INTERMEDIATE
10:06:59.236 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to COMPLETE
...
10:12:19.467 [main] INFO com.gnu.kafka.admin.AdminTutorial - Controller is 192.168.0.203:9092
...
10:12:23.993 [main] INFO com.gnu.kafka.admin.AdminTutorial - topic gnu-test-topic - partition [(partition=0, leader=192.168.0.203:9092 (id: 3 rack: null), replicas=192.168.0.201:9092 (id: 1 rack: null), 192.168.0.203:9092 (id: 3 rack: null), isr=192.168.0.203:9092 (id: 3 rack: null), 192.168.0.201:9092 (id: 1 rack: null)), (partition=1, leader=192.168.0.203:9092 (id: 3 rack: null), replicas=192.168.0.203:9092 (id: 3 rack: null), 192.168.0.202:9092 (id: 2 rack: null), isr=192.168.0.203:9092 (id: 3 rack: null), 192.168.0.202:9092 (id: 2 rack: null)), (partition=2, leader=192.168.0.201:9092 (id: 1 rack: null), replicas=192.168.0.202:9092 (id: 2 rack: null), 192.168.0.201:9092 (id: 1 rack: null), isr=192.168.0.201:9092 (id: 1 rack: null), 192.168.0.202:9092 (id: 2 rack: null))]

configure 메소드에서 인자로 전달된 정보 출력, 키보드를 통한 id, password 직접 입력, 인증 과정 수행, 인증 후 broker에서 정보를 가져오는 부분이 수행된 것을 볼 수 있다.

 

다음 포스트는 아마도... 멀티 리스너를 구현할 때 일반적인 경우와 SASL 의 경우에 차이가 약간 나는 부분을 하지 않을까? 싶은 생각이 든다... kafka 공부를 시작한지 이제 딱 1년 정도 된 것 같은데... 지난 반 년 정도를 실제 시스템에 kafka를 올려보려고 삽질하고나니 정리해야 할 주제가 쌓여 있다. 열심히 써야지...