dev/Cloud & Infra

kafka 운영 - kafka의 Exception들 - (1)

lugi 2019. 8. 26. 07:45

Kafka를 운영하면서 이런 저런 에러메시지를 보게 됩니다. 그러고 문서를 찾아보거나 검색을 합니다. 그러면서 쌓아가는 지식도 있지만, 얻어 맞기 전에 대비를 하는 것도 괜찮겠다는 생각이 들었습니다.

 

개발자에게 있어 가장 좋은 교보재는 소스코드와 그 주석이라는 생각을 가지고 있기 때문에, Kafka 가 발생시키는 Exception 의 종류를 살펴보고, 이 Exception 은 어떤 상황에 발생하는지, Exception을 회피하기 위한 조건은 무엇이 있는지를 미리 알아두면, 좀 더 운영에 도움이 될 것 같아 Kafka의 소스코드를 참고하여 정리 해 보았습니다.

 

Kafka의 Exception 구조

/**
 * The base class of all other Kafka exceptions
 */
public class KafkaException extends RuntimeException {

    private final static long serialVersionUID = 1L;

    public KafkaException(String message, Throwable cause) {
        super(message, cause);
    }

    public KafkaException(String message) {
        super(message);
    }

    public KafkaException(Throwable cause) {
        super(cause);
    }

    public KafkaException() {
        super();
    }

}

Kafka 의 기본이 되는 Exception 은 RuntimeException을 상속한 KafkaException 입니다.  이 Exception 을 다시 상속 받은 클래스들을 이용해 각 에러에 대한 처리를 하는 구조입니다.

 

KafkaException 을 직접 상속한 클래스의 계층 구조는 다음과 같습니다

 

  • KafkaException : 모든 kafka Exception 들의 Base
    • WakeupException : 블로킹 작업에서 외부 스레드의 선점시 나타나는 Exception 입니다. 예를 들어 consumer.poll 중에 다른 스레드에서 consumer.wakeup()을 호출하면 WakeupException이 발생하고 이를 catch 하여 후속 로직을 작성할 수 있습니다.
    • SerializationException : serialize 작업에서 발생하는 Exception 입니다. 레코드의 엔트리에서 key/value를 파싱하는 중 형식상의 오류 등이 생기면 발생합니다.
    • UngracefulShutdownException : 서비스가 갑자기 종료되었음을 나타내는 Exception입니다. 테스트용으로 의도하고 만든 Exception입니다.
    • QuotaViolationException : Metric이 설정된 한계치를 넘었을 때 발생하는 Exception입니다.
    • InvalidReceiveException : NetworkReceive (size 단위로 구분된 Receive) 에서 수신된 사이즈가 0보다 작거나 (비정상 수신), max size 보다 큰 size가 수신되었을 때 발생하는 Exception, socket.request.max.bytes (기본적으로 100메가 정도로 설정됨)보다 더 큰 데이터를 전송하려 할 때 주로 발생합니다. 개인적인 경험으로는 해당 프로퍼티의 한계치보다도 작은 데이터를 전송했는데도 발생한 적이 있어 살펴본 결과, server와 client의 security protocol 설정이 제대로 되어 있지 않을 경우에도 간혹 발생하는 것을 경험하였습니다.
    • BufferExhaustedException : 논-블로킹 모드의 프로듀서를 사용할 때, 데이터 생성 속도가 일정 수준을 넘어서 서 버퍼가 가득 찼을 경우에 발생합니다. block.on.buffer.full 이 false 일 경우에 발생하지만, 이 옵션 자체가 Kafka 0.9.0 부터는 deprecated 되었으므로 그 이상의 버전에서는 큰 의미가 없는 Exception입니다. 대체 로직으로 max.block.ms 속성을 사용하여 Producer 의 send() 및 partitionsFor() 의 차단시간을 제어하고, 제한 시간을 초과하였을 경우 TimeoutException을 발생시킵니다.
    • StreamsException : Kafka Streams 에서 사용하는 Exception들의 최상위 레벨 Exception 입니다.
    • OAuthBearerConfigException : SASL/OAUTHBEARER 보안에서 인증 관련 설정에 문제가 있을 경우 발생하는 Exception 입니다.
    • ConfigException : 기동이나 동작 중에 잘못된 config property 가 설정되었을 경우 발생하는 Exception 입니다. 에러 메시지를 참고하여 어떤 설정이 잘못되었는지 교정하도록 합니다.
    • SchemaException : 요청이나 응답 과정에서 프로토콜의 스키마가 잘못되었을 때 발생합니다. 프로토콜 버전이 맞지 않거나, 없는 필드이거나, 필드가 중복되었거나, 필수 필드가 누락되었거나, 스키마의 타입이 잘못되었거나 그런 경우입니다. 저는 일반적으로 JSON을 막 던지기 때문에 겪어본 적이 없는 Exception입니다. 아마도 AVRO 를 사용할 경우엔 만날 수 있지 않을까 싶습니다. (글을 적는 사이에 좋은 블로그 포스팅이 올라왔네요 Kafka 스키마 관리, Scheme Registry를 참고해보세요)
    • InterruptException : 무언가의 작업 중 스레드의 간섭이 일어났을 때 생깁니다. Heartbeat, Consume, Producer block 등 다양한 작업 중에 간섭이 생길 수 있습니다.
    • OAuthBearerIllegalTokenException : 토큰 자체의 문제로 검증에 실패했을 때 생기는 Exception 입니다. unsecured JWS의 형식에 맞지 않거나(.으로 연결된 3단어 형식이 아니거나, 시그니처가 올바르지 않거나) 올바른 BASE64 인코딩이 아니거나, claim 의 type이 맞지 않거나, alg 헤더가 none이 아니거나. 입니다.
    • ConnectException : Kafka Connect 나 Kafka의 connector 가 발생시키는 Connection 관련 최상위 Exception입니다.
    • CommitFailedException : KafkaConsumer.commitSync() 를 사용하는 오프셋 커밋이 복구할 수 없는 오류를 냈을 때 발생합니다. 커밋이 적용되기 전 그룹 리밸런싱이 완료되었을 경우 발생할 수 있습니다. 커밋을 재시도할 수 없는 이유는, 일부 파티션이 이미 그룹의 다른 멤버에게 assign 되었을 수도 있기 때문입니다. 위와 같은 문제가 발생하는 것은 폴링 루프가 메시지 처리에 소비하는 시간이 길기 때문일 수 있습니다. max.poll.interval.ms 를 늘려 처리 시간을 좀 더 길게 허용하거나, max.poll.records 를 줄여, 배치 처리 단위를 줄이면 문제를 해결 할 수 있을지도 모릅니다.
    • InvalidOffsetException : missing position을 reset 하는 과정에서, offset이 파티션에 정의되지 않았거나 consumer가 fetchRecords를 수행할 때, 컨슈머가 요청한 offset이 서버에서 관리하는 offset의 범위 밖일 때 발생합니다. 이 때 reset policy(auto.offset.reset) 정책이 지정되어 있을 경우 offset의 reset을 수행하지만, none일 경우 이 Exception이 발생합니다.
    • ApiException : 공개 프로토콜과 관련된 Exception들의 Base가 되는 Exception 입니다. 꽤 많습니다. 한 50여개? 따로 정리해야 할 것 같습니다.

 

RuntimeException을 직접 상속하는 Exception도 몇 가지 있습니다. 주로 Zookeeper의 동작과 연관되어 있습니다.

  • RuntimeException
    • TopicAlreadyMarkedForDeletionException : Topic 삭제는 delete.topic.enable 옵션이 true 일 때만 동작합니다. (이 옵션은 Kafka 1.0.0 부터 기본으로 true입니다) 이 옵션이 false 일 경우에는 Topic delete는 아무런 영향을 미치지 않습니다. 다만 이 옵션을 true로 주었음에도 불구하고, 해당 topic이 바로 삭제되지 않는 경우가 있습니다,
      이는 TopicDeletionManager의 동작 방식과 영향이 있습니다. TopicDeletetionManager는 Topic을 즉시 삭제하지 않고, /admin/delete_topics 에 topic deletion을 발행합니다. 이후 Controller가 변경 이벤트에 반응하여 리플리케이션들의 상태를 TopicDeletionStarted로 변경하고, Topic 삭제를 개시합니다. 이 때 해당 Topic을 Hosting하고 있는 Broker가 다운되었거나, 해당 Topic의 Partition이 재할당 중이면(주키퍼 노드에서 /admin/reassign_partitions 가 존재한다면) Deletion을 마킹하지만, 삭제는 할 수 없습니다.
      이 때 급한 마음에 해당 Topic에 대한 삭제를 재시도하면, NodeExistsException 이 "Topic ~~~ is already marked for deletion" 이라는 메시지와 함께 발생합니다. 
      정상적인 내부 동작 중에서는 TopicDeletionManager가 Deletion이 마킹된 토픽의 삭제 재시도를 하면서 이 Exception이 발생하지만, 이는 정상적인 동작입니다. 중지된 브로커가 재시작되거나 파티션 재할당이 완료되면, 다시 삭제를 개시하여 모든 리플리케이션에서 TopicDeletionSuccessful 상태가 되면 Topic이 삭제됩니다. 그러므로 토픽을 삭제했을 때 바로 삭제되지 않는다면, 일반적으로 기다리면 해결이 됩니다. 성격이 급하다면 브로커를 재시작하거나, 주키퍼에서 직접 삭제할 수 있습니다. (만약 Mark 된 삭제 요청을 중지하려면, 주키퍼 노드의 /admin/delete_topic 하위의 해당 topic명을 제거하세요)
    • CorruptIndexException : Kafka는 메시지의 Topic을 Partition으로 나누고, Partition은 다시 Segment 혹은 Log Segment로 나뉩니다. 이것이 저장되는 경로가 server.properties 의 log.dirs 입니다. Kafka는 Partition의 Offset를 찾기 위해 Index를 관리하는데, Broker의 이상 동작 (급작스러운 Shutdown 이나 File system의 에러 등)으로 이 부분이 손상될 수 있습니다. Kafka는 인덱스의 무결성을 검증하지 않습니다. 만약 이 부분이 손상되었다면, 카프카는 세그먼트를 복구하고 인덱스 파일을 다시 빌드합니다. 만약 이 에러가 자동으로 복구되지 않을 경우는 index 파일을 삭제하여도 무방합니다. Kafka가 다시 생성할 것입니다. (시간은 좀 걸릴 수 있습니다)
    • NoEpochForPartitionException : 컨트롤러 브로커는 파티션의 리더를 선출할 책임을 가지는 브로커입니다. 컨트롤러 브로커가 fail 혹은 zookeeper와의 connection closed로 인해 동작을 중단하고, 다른 브로커들이 그것을 인지했을 때 다른 브로커들이 zookeeper의 /controller 노드 생성을 시도합니다. 이 때 생성에 성공한 node가 다시 controller가 됩니다. controller는 주키퍼로부터 새로운 세대 번호(epoch)를 부여 받고 partition에 대한 메타데이터를 읽어들입니다. (이 과정에서 파티션이 많다면 지연 시간이 발생할 수 있습니다. 파티션 숫자를 적절하게 유지해야 하는 이유입니다) 만약 partition이 존재하지 않는 Epoch로 요청을 날렸다면, 이 Exception이 발생할 것입니다.
    • InconsistentBrokerIdException : broker.id 와 관련된 Exception 입니다. 여러개의 브로커가 log.dirs 를 공유하거나 log.dirs를 수동으로 조작하였을 때 발생할 수 있습니다. 새로운 브로커를 생성하기 위해서는 log.dirs 를 삭제하여야 합니다.
    • StateChangeFailedException : Controller가 STATE 변경에 실패했을 때 나타나는 Exception입니다. Leader 및 ISR과 밀접한 연관이 있습니다. Leader가 선출이 교체된 상태에서 이전 리더와 무언가 충돌이 발생하여 Controller가 정상적으로 상태 변경을 수행할 수 없는 경우입니다.  
    • ThreadShutdownException : Thread가 정상적으로 Shutdown 되고 있음을 나타내는 Exception입니다.
    • UnknownCodecException : 압축 코덱 판별시, 압축이 없거나, Kafka가 지원하는 코덱(GZIP, Snappy, LZ4, ZStd)가 아닌 경우 발생하는 Exception입니다.
    • BrokerEndPointNotAvailableException : Broker의 엔드포인트가 사용 불가할 때 발생합니다. server.properties에 설정된 listener의 protocol과 client의 protocol이 일치하지 않을 때 주로 발생합니다.
    • GenerateBrokerIdException : BrokerId로 사용할 Zookeeper의 Seq Id를 생성하지 못했을 때 발생합니다. BrokerId는 사용자가 지정할 수도 있지만, broker.id 를 지정하지 않았을 때는 reservered.broker.max.id 에서 +1 을 하며 생성됩니다. broker.id 를 지정할 때도 reserved.broker.max.id 보다 큰 값을 지정해서는 안 됩니다.
    • OffsetsOutOfOrderException : Active segment (메시지를 쓰기 위해 사용 중인 세그먼트) 에 메시지를 append할 때 레코드에서 예상 밖의 오프셋이 나타날 경우 발생합니다. 오프셋은 단조증가(Monotonic)하여야 하는데, 이것이 불규칙하다는 것입니다. 이상 레코드가 유입되었거나, 팔로워와의 IO에 이상이 있음을 의심할 수 있습니다.
    • LogCleaningAbortedException : LogCleaning 작업이 중단되었을 때 발생하는 Exception 입니다. offset이 overflow 되었을 때 발생할 수 있습니다. 이 Exception이 발생하면 overflow segment를 split 하는 작업이 .clean파일을 이용하여 수행되며, 이전 segment는 새로운 segment로 대체 됩니다. 
    • ZookeeperClientException : ZookeeperClient 에서 발생하는 Exception입니다.
    • AdminCommandFailedException : Admin 명령어가 실패했을 때 발생하는 Exception 입니다. 상세한 에러 메시지를 제공해주기 때문에, 메시지에 따라 조치하면 되는 경우입니다.
    • UnexpectedAppendOffsetException : 레코드를 팔로워나 Future 리플리카에 추가할 때 발생할 수 있습니다, 레코드의 삭제 요청으로 인해 로그의 시작 오프셋이 배치작업 중간에 fall 된 상태에서 팔로워가 리더에서 첫번째 오프셋을 가져오더려고 시도할 때 발생합니다. 이 Exception이 발생하면 truncateFullyAndStartAt 으로 해당 파티션의 모든 local log 를 삭제한 후, 새로운 오프셋에서 시작합니다.
    • NoRecordsException (in MirrorMaker$) : Mirror Maker에서 Record 탐색시 레코드가 없을 경우 발생합니다. TRACE 레벨로 로그를 출력하고 그대로 진행됩니다.
    • AdminOperationException : 내부 동작 중 이상이 있을 경우 발생합니다. 그룹 메타데이터나 트랜잭션 상태와 같은 내부 토픽을 삭제하려 한 경우, replica를 rack에 할당하려 하였으나 정보가 불완전하여 수행할 수 없는 경우 등입니다.

생각보다 책, 블로그, Kafka 소스 코드 및 주석 등을 이것저것 참고하여 작성하느라 시간이 많이 걸렸습니다.

실제로 위의 Exception 이외에 ApiException을 상속하여 사용하는 클래스도 약 50여개가 됩니다.

계속 정리 해 나가도록 하겠습니다.

 

 

참고자료 :

https://medium.com/@durgaswaroop/a-practical-introduction-to-kafka-storage-internals-d5b544f6925f

https://stackoverflow.com/questions/51105355/warn-found-a-corrupted-index-file-due-to-requirement-failed-corrupt-index-found

https://stackoverflow.com/questions/19394669/why-do-index-files-exist-in-the-kafka-log-directory

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2

https://knight76.tistory.com/entry/kafka-lag-%EC%83%9D%EA%B8%B4%EB%8B%A4%EA%B3%A0-%ED%8C%8C%ED%8B%B0%EC%85%98-%EC%B6%94%EA%B0%80%ED%95%98%EB%8A%94-%EA%B2%83%EC%97%90-%EB%8C%80%ED%95%B4?category=691865