본문 바로가기

Cloud/AZURE

Azure Event Hubs - Kafka를 사용한 예제

Today Keys :  event hub, azure, cloud, kafka, producer, consumer, 이벤트 허브, topic


이번 포스팅은 Azure의 빅 데이터 스트리밍 플랫폼 및 이벤트 수집 서비스 인 Event Hubs에 대한 예제입니다.

Apache Kafka 클러스터를 대신해서, Apache Kafka Producer로 보낸 메시지를 Event Hubs를 통해서 

Apache Kafka Consumer에게 전달하는 예를 다룹니다.


 이번 포스팅에서 Azure Evnet Hubs Kafka endpoint를 사용하면, Kafka 프로토콜을 사용하여, Azure Event Hubs에 연결 할 수 있습니다. 따라서, 기존의 Kafka 애플리케이션의 변경을 최소화하면서 Kafka 클러스터를 Event Hubs로 전환하여 구성할 수 있습니다. 

단, Apache Kafka 버전 1.0 이상만 지원하며, Event hubs의 Pricing tier가 Standard에서만 지원합니다.

Kafka 프로토콜을 사용하여, Event hubs를 이용하는 다양한 언어에 대한 예제를 제공하고 있으며 여기에서는 그 중에 Python을 이용해서 테스트를 합니다. 

Github - Azure Event hubs for kafka

테스트를 위해서, 의존성을 갖는 패키지들을 설치해야 하는 데, 

Git, Python(2.7.X 혹은 3.6.X 이상), Pip, OpenSSL(libssl포함), librdkafka가 있습니다. 

이러한 의존성 패키지를 직접 설치할 수도 있지만, Git에서 제공되는 파일을 복제해 오면,

의존성 패키지를 설치하는 스크립트도 포함되어 있기 때문에 개별로 설치하지 않아도 됩니다.

 

그럼 Azure Event hubs for kafka Repo를 먼저 복제합니다.

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git

 

복제가 완료되면, Python을 이용한 quickstart 디렉토리로 이동해서, 어떤 파일이 있는지 살펴봅니다.

초기 환경 구성을 위한 의존성 패키지 설치용 스크립트와 Producer와 Consumer 파일이 있습니다.

 cd azure-event-hubs-for-kafka / quickstart / python

 

 

 

이제 초기 환경 구성을 위해서 setup.sh를 실행합니다.

전체 설치를 위해서 수 분이 소요됩니다.

 source setup.sh

 

참고로 setup.sh 의 내용은 다음과 같습니다. 

 

이제 기본 구성이 끝나면, Producer.py에서 Event Hubs를 연결하기 위해 일부 수정을 해야 합니다. 

수정이 필요한 부분은 23번째 줄의 'bootstrap.servers'와 25번째 줄의 'ssl.ca.location', 28번째 줄의 'sasl.password' 입니다.

 

producer.py

bootstrap.servers는 Event Hubs의 Endpoint를 입력하는 데, 

이 정보는 Event Hubs 네임스페이스의 '공유 액세스 정책'을 확인하면, 연결 문자열이 있습니다. 

여기에서 Endpoint=sb:// 이후부터, /;SharedAccess.. 앞 부분 사이의 주소가 Endpoint가 되는 데

해당 Endpoint를 보면,

'Event Hubs 네임스페이스'.servicebus.windows.net 

임을 알 수 있습니다.

여기에 Kafka 포트인 9093을 붙입니다.  

저는 zigieh라는 이름으로 Event Hubs 네임스페이스를 지정했기 때문에 

zigieh.servicebus.windows.net:9093 

이 bootstrap.servers의 정보가 됩니다.

다음은 ssl.ca.location 인데, 인증서 저장소의 경로로 설정합니다. 

Ubuntu의 경우 일반적으로 기본 값(/usr/lib/ssl/certs/ca-certificates.crt) 사용이 가능하기 때문에 기본 값을 그대로 사용합니다.

마지막으로  'sasl.password' 값인데, 이 값은 앞서 확인했던 Event Hubs 네임스페이스의 '공유 액세스 정책'을 확인하면 알 수 있는 연결 문자열을 전체 복사하여 사용합니다. (기본 키만 복사하시면 됩니다.)

 

 

변경된 값의 예시는 다음과 같습니다.

 

다음은 이제 consumer.py 를 수정합니다. 

consumer의 수정 부분은 다음과 같습니다.

변경해야 하는 값은 producer.py와 동일하며, 각각 43/45/48번째 줄에 위치해 있습니다.

 

수정한 예시는 다음과 같습니다.

 

 

이제 consumer가 메시지를 수신 받기 위한 consumer group을 생성합니다.

Event hubs 인스턴스에서, zigicg라고 이름의 consumer group을 미리 생성해둡니다.

 

이제 모든 준비가 끝났습니다.

producer.py를 이용해서 메시지를 전송하는 데, kafka의 topic은 Event hubs 인스턴스이기 때문에 

Event hubs 네임스페이스에서 Event hubs 인스턴스를 미리 만들어 둡니다.

저는 zigieb라는 이름으로 미리 Event hubs 인스턴스를 만들어 두었습니다.
(앞의 consumer group은 이 Event hubs 인스턴스 내에서 생성합니다.)

producer를 실행하여, 메시지를 Event hubs로 전달합니다.

 python producer.py [topic]
 python producer.py zigieb

consumer를 실행하여 Event hubs에서 메시지를 가져옵니다.

 python consumer.py [consumer group] [topic]
 python consumer.py zigicg zigieg

 

정상적으로 메시지가 event hubs를 통해서 전달되는 것을 확인할 수 있습니다. 

 

만약 producer 실행 시에 정상적으로 성공하지 않은 경우에는 패키지 의존성에 의해서 이슈가 나올 수 있으므로, 

다음과 같은 방법을 수행하도록 권고합니다.

  • $ sudo apt-get purge librdkafka1설정 스크립트를 실행 하고 다시 실행하십시오 .
  • Github에서 Confluent의 python Kafka 라이브러리를 복제하고 공식 pip 패키지 대신 설치합니다.