Blog recommendation | Apache Pulsar realizes fast log retrieval based on Log4j2+Kafka+ELK

This article is reproduced from the official account StreamCloudNative, author Xue Song, who works in the new world software as a senior software engineer.  

Editor: chicken chop, StreamNative.

About Apache Pulsar

Apache Pulsar is a top-level project of the Apache Software Foundation. It is a native distributed message flow platform for the next generation cloud. It integrates message, storage and lightweight functional computing. It adopts a separate architecture design of computing and storage, supports multi tenant, persistent storage, multi machine room cross regional data replication, and has strong consistency, high throughput Stream data storage features such as low latency and high scalability.  

At present, many large Internet and traditional industry companies at home and abroad have adopted Apache Pulsar. The cases are distributed in artificial intelligence, finance, telecom operators, live and short video, Internet of things, retail and e-commerce, online education and other industries, such as American cable TV network giants Comcast, Yahoo Tencent, China Telecom, China Mobile, BIGO, VIPKID, etc.  

Background introduction

As a cloud native distributed messaging system, Apache Pulsar includes Zookeeper, bookie, broker, functions worker, proxy and other components, and all components are deployed on multiple hosts in a distributed manner. Therefore, the log files of each component are scattered on multiple hosts. When there is a problem with the components, due to the scattered logs, If you want to check whether each service has error information, you need to check it one by one. It is troublesome. Usually, we can obtain the desired information by directly issuing grep, awk and other commands to the log file. However, with the increase of the volume of applications and services, the number of supporting nodes also increases, so the traditional methods expose many problems, such as low efficiency, how to archive if the log volume is too large, what to do if the text search is too slow, how to multi-dimensional query and so on. Therefore, we hope that through the aggregation and monitoring of logs, we can quickly find the error information of Pulsar services and conduct rapid troubleshooting, so as to make the operation and maintenance more purposeful, targeted and direct.

In order to solve the problem of log retrieval, our team considers using a centralized log collection system to uniformly collect, manage and access logs on all nodes of Pulsar.

A complete centralized log system needs to include the following main features:

  • Collection - can collect log data from multiple sources;
  • Transmission - able to stably transmit log data to the central system;
  • Storage - how to store log data;
  • Analysis - UI analysis can be supported;
  • Warning - provides error reporting and monitoring mechanisms

ELK provides a complete set of solutions, all of which are open source software. They are used together, perfectly connected, and efficiently meet the applications in many occasions. It is a mainstream log system at present. Our company has a self-developed big data management platform. ELK is deployed and managed through the big data management platform, and ELK has been used in the production system to provide support services for multiple business systems. ELK is the abbreviation of three open source software, namely Elasticsearch, Logstash and Kibana. They are all open source software. The latest version has been renamed Elastic Stack, and the Beats project has been added, including Filebeat. It is a lightweight log collection and processing tool (Agent). Filebeat occupies less resources, It is suitable for collecting logs on each server and transmitting them to Logstash.

As can be seen from the above figure, if Pulsar uses this log collection mode, there are two problems:

  • The host with Pulsar service deployed must deploy a set of Filebeat services;
  • The Pulsar service log must be placed on the disk once in the form of a file, which occupies the IO of the host disk.

For this reason, we consider that Apache Pulsar implements fast log retrieval based on Log4j2+Kafka+ELK. Log4j2 supports the function of sending logs to Kafka by default. By using Kafka's own Log4j2Appender and configuring it in the log4j2 configuration file, we can send the logs generated by log4j2 to Kafka in real time.

As shown in the figure below:

Implementation process

Taking pulsar version 2.6.2 as an example, the following describes the detailed implementation process of Apache Pulsar's solution for fast log retrieval based on Log4j2+Kafka+ELK.  

1, Preparatory work

First, you need to determine the fields used to retrieve logs in Kibana. These fields can be aggregated and queried in multiple dimensions. Then, Elasticsearch divides words according to the retrieved fields and creates an index.

As shown in the figure above, we will create 8 retrieval fields for Pulsar logs, namely: cluster name, host name, host IP, component name, log content, system time, log level and cluster instance.

2, Implementation process

Note: in order to ensure that the structure of Pulsar's native configuration file and script file is not damaged, we implement this scheme by adding new configuration file and script file.

1. Add profile

Add the following two configuration files in the {PULSAR_HOME}/conf Directory:

1)logenv.sh this file transfers the JVM options required when starting the Pulsar component to the Java process of the Pulsar service in a configured manner. The content example is as follows:

KAFKA_CLUSTER=192.168.0.1:9092,192.168.0.2:9092,192.168.0.2:9092
PULSAR_CLUSTER=pulsar_cluster
PULSAR_TOPIC=pulsar_topic
HOST_IP=192.168.0.1
PULSAR_MODULE_INSTANCE_ID=1

The meanings of the above fields are:

  • KAFKA_CLUSTER: Kafka broker list address;
  • PULSAR_CLUSTER: cluster name of Pulsar;
  • PULSAR_TOPIC: topic in Kafka for accessing Pulsar service log;
  • HOST_IP: IP of Pulsar host;
  • PULSAR_MODULE_INSTANCE_ID: the instance ID of Pulsar service. Multiple Pulsar clusters may be deployed on a host. Clusters are distinguished by instance ID.

2)log4j2-kafka.yaml 

The configuration file is from log4j2 Yaml is copied from log4j2 Add the following modifications based on yaml: (Note: in the figure below, log4j2.yaml is on the left and log4j2-kafka.yaml is on the right.)

  • Add the Kafka cluster broker list and define the message record format log4j2 written to Kafka. The eight search fields in a message are separated by spaces, and Elasticsearch uses spaces as separators to segment the eight search fields.

• add kafka Appenders;

• add Failover;

• modify the Root and Logger of Loggers to asynchronous mode;

•log4j2-kafka. The complete contents of yaml configuration file are as follows:

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#




Configuration:
  status: INFO
  monitorInterval: 30
  name: pulsar
  packages: io.prometheus.client.log4j2


  Properties:
    Property:
      - name: "pulsar.log.dir"
        value: "logs"
      - name: "pulsar.log.file"
        value: "pulsar.log"
      - name: "pulsar.log.appender"
        value: "RoutingAppender"
      - name: "pulsar.log.root.level"
        value: "info"
      - name: "pulsar.log.level"
        value: "info"
      - name: "pulsar.routing.appender.default"
        value: "Console"
      - name: "kafkaBrokers"
        value: "${sys:kafka.cluster}"
      - name: "pattern"
        value: "${sys:pulsar.cluster} ${sys:pulsar.hostname} ${sys:pulsar.hostip} ${sys:pulsar.module.type} ${sys:pulsar.module.instanceid} %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%c{10}] %level , %msg%n"


  # Example: logger-filter script
  Scripts:
    ScriptFile:
      name: filter.js
      language: JavaScript
      path: ./conf/log4j2-scripts/filter.js
      charset: UTF-8


  Appenders:


    #Kafka
    Kafka:
      name: "pulsar_kafka"
      topic: "${sys:pulsar.topic}"
      ignoreExceptions: "false"
      PatternLayout:
        pattern: "${pattern}"
      Property:
        - name: "bootstrap.servers"
          value: "${kafkaBrokers}"
        - name: "max.block.ms"
          value: "2000"


    # Console
    Console:
      name: Console
      target: SYSTEM_OUT
      PatternLayout:
        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"


    Failover:
      name: "Failover"
      primary: "pulsar_kafka"
      retryIntervalSeconds: "600"
      Failovers:
        AppenderRef:
          ref: "RollingFile"


    # Rolling file appender configuration
    RollingFile:
      name: RollingFile
      fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
      filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
      immediateFlush: false
      PatternLayout:
        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
      Policies:
        TimeBasedTriggeringPolicy:
          interval: 1
          modulate: true
        SizeBasedTriggeringPolicy:
          size: 1 GB
      # Delete file older than 30days
      DefaultRolloverStrategy:
          Delete:
            basePath: ${sys:pulsar.log.dir}
            maxDepth: 2
            IfFileName:
              glob: "*/${sys:pulsar.log.file}*log.gz"
            IfLastModified:
              age: 30d


    Prometheus:
      name: Prometheus


    # Routing
    Routing:
      name: RoutingAppender
      Routes:
        pattern: "$${ctx:function}"
        Route:
          -
            Routing:
              name: InstanceRoutingAppender
              Routes:
                pattern: "$${ctx:instance}"
                Route:
                  -
                    RollingFile:
                      name: "Rolling-${ctx:function}"
                      fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log"
                      filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
                      PatternLayout:
                        Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
                      Policies:
                        TimeBasedTriggeringPolicy:
                          interval: 1
                          modulate: true
                        SizeBasedTriggeringPolicy:
                          size: "20MB"
                        # Trigger every day at midnight that also scan
                        # roll-over strategy that deletes older file
                        CronTriggeringPolicy:
                          schedule: "0 0 0 * * ?"
                      # Delete file older than 30days
                      DefaultRolloverStrategy:
                          Delete:
                            basePath: ${sys:pulsar.log.dir}
                            maxDepth: 2
                            IfFileName:
                              glob: "*/${sys:pulsar.log.file}*log.gz"
                            IfLastModified:
                              age: 30d
                  - ref: "${sys:pulsar.routing.appender.default}"
                    key: "${ctx:function}"
          - ref: "${sys:pulsar.routing.appender.default}"
            key: "${ctx:function}"


  Loggers:


    # Default root logger configuration
    AsyncRoot:
      level: "${sys:pulsar.log.root.level}"
      additivity: true
      AppenderRef:
        - ref: "Failover"
          level: "${sys:pulsar.log.level}"
        - ref: Prometheus
          level: info


    AsyncLogger:
      - name: org.apache.bookkeeper.bookie.BookieShell
        level: info
        additivity: false
        AppenderRef:
          - ref: Console


      - name: verbose
        level: info
        additivity: false
        AppenderRef:
          - ref: Console


    # Logger to inject filter script
#     - name: org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
#       level: debug
#       additivity: false
#       AppenderRef:
#         ref: "${sys:pulsar.log.appender}"
#         ScriptFilter:
#           onMatch: ACCEPT
#           onMisMatch: DENY
#           ScriptRef:
#             ref: filter.js

matters needing attention:

  • Log access must be asynchronous and must not affect service performance;
  • Systems with high response requirements must rely on decoupling when accessing third-party systems. The Failover Appender here is to decouple the dependence on Kafka. When Kafka crashes, the log triggers Failover and writes locally;
  • The default value of log4j2 Failover appender retryIntervalSeconds is 1 minute, which is switched through exceptions, so the interval can be increased appropriately, such as the above 10 minutes;
  • Kafka appender ignoreExceptions must be set to false, otherwise Failover cannot be triggered;
  • There is a big pit here, which is max.block MS property, the default value in KafkaClient package is 60000ms. When Kafka goes down, it takes 1 minute to try to write Kafka to return an Exception, and then Failover will be triggered. When the number of requests is large, the log4j2 queue will soon be full, and then the log will be blocked, which seriously affects the response of the main service. Therefore, the queue length should be set to be short enough and long enough.

2. Add script file

Add the following two script files in the {PULSAR_HOME}/bin directory: 1) pulsar Kafka the script file is copied from the pulsar script file. Add the following modifications on the basis of the pulsar script file: (Note: in the figure below, pulsar is on the left and pulsar Kafka is on the right.)

• specify log4j2 Kafka yaml;

• add read logenv Contents of the project;

• add the OPTS option to pass the JVM option to the Java process when starting the Pulsar component in the Pulsar Kafka and Pulsar daemon Kafka script files;

• the complete contents of pulsar Kafka script file are as follows:

#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#


BINDIR=$(dirname "$0")
export PULSAR_HOME=`cd -P $BINDIR/..;pwd`


DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf
DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2-kafka.yaml
DEFAULT_PULSAR_PRESTO_CONF=${PULSAR_HOME}/conf/presto


# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/deps
FUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}
SQL_HOME=$PULSAR_HOME/pulsar-sql
PRESTO_HOME=${PULSAR_HOME}/lib/presto


# Check bookkeeper env and load bkenv.sh
if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
then
    . "$PULSAR_HOME/conf/bkenv.sh"
fi


# Check pulsar env and load pulser_env.sh
if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
    . "$PULSAR_HOME/conf/pulsar_env.sh"
fi


if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
then
    . "$PULSAR_HOME/conf/logenv.sh"
fi


# Check for the java to use
if [[ -z $JAVA_HOME ]]; then
    JAVA=$(which java)
    if [ $? != 0 ]; then
        echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
        exit 1
    fi
else
    JAVA=$JAVA_HOME/bin/java
fi


# exclude tests jar
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? == 0 ]; then
    PULSAR_JAR=$RELEASE_JAR
fi


# exclude tests jar
BUILT_JAR=`ls $PULSAR_HOME/pulsar-broker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
    echo "\nCouldn't find pulsar jar.";
    echo "Make sure you've run 'mvn package'\n";
    exit 1;
elif [ -e "$BUILT_JAR" ]; then
    PULSAR_JAR=$BUILT_JAR
fi


#
# find the instance locations for pulsar-functions
#


# find the java instance location
if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
    # didn't find a released jar, then search the built jar
    BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
    if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
        echo "\nCouldn't find pulsar-functions java instance jar.";
        echo "Make sure you've run 'mvn package'\n";
        exit 1;
    fi
    JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
fi


# find the python instance location
if [ ! -f "${PY_INSTANCE_FILE}" ]; then
    # didn't find a released python instance, then search the built python instance
    BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
    if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
        echo "\nCouldn't find pulsar-functions python instance.";
        echo "Make sure you've run 'mvn package'\n";
        exit 1;
    fi
    PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
fi


# find pulsar sql presto distribution location
check_presto_libraries() {
    if [ ! -d "${PRESTO_HOME}" ]; then


        BUILT_PRESTO_HOME="${SQL_HOME}/presto-distribution/target/pulsar-presto-distribution"
        if [ ! -d "${BUILT_PRESTO_HOME}" ]; then
            echo "\nCouldn't find presto distribution.";
            echo "Make sure you've run 'mvn package'\n";
            exit 1;
        fi
        PRESTO_HOME=${BUILT_PRESTO_HOME}
    fi
}


pulsar_help() {
    cat <<EOF
Usage: pulsar <command>
where command is one of:


    broker              Run a broker server
    bookie              Run a bookie server
    zookeeper           Run a zookeeper server
    configuration-store Run a configuration-store server
    discovery           Run a discovery server
    proxy               Run a pulsar proxy
    websocket           Run a web socket proxy server
    functions-worker    Run a functions worker server
    sql-worker          Run a sql worker server
    sql                 Run sql CLI
    standalone          Run a broker server with local bookies and local zookeeper


    initialize-cluster-metadata     One-time metadata initialization
    delete-cluster-metadata         Delete a cluster's metadata
    initialize-transaction-coordinator-metadata     One-time transaction coordinator metadata initialization
    initialize-namespace     namespace initialization
    compact-topic       Run compaction against a topic
    zookeeper-shell     Open a ZK shell client
    broker-tool         CLI to operate a specific broker
    tokens              Utility to create authentication tokens


    help                This help message


or command is the full name of a class with a defined main() method.


Environment variables:
   PULSAR_LOG_CONF               Log4j configuration file (default $DEFAULT_LOG_CONF)
   PULSAR_BROKER_CONF            Configuration file for broker (default: $DEFAULT_BROKER_CONF)
   PULSAR_BOOKKEEPER_CONF        Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
   PULSAR_ZK_CONF                Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
   PULSAR_CONFIGURATION_STORE_CONF         Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF)
   PULSAR_DISCOVERY_CONF         Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
   PULSAR_WEBSOCKET_CONF         Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
   PULSAR_PROXY_CONF             Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
   PULSAR_WORKER_CONF            Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
   PULSAR_STANDALONE_CONF        Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
   PULSAR_PRESTO_CONF            Configuration directory for Pulsar Presto (default: $DEFAULT_PULSAR_PRESTO_CONF)
   PULSAR_EXTRA_OPTS             Extra options to be passed to the jvm
   PULSAR_EXTRA_CLASSPATH        Add extra paths to the pulsar classpath
   PULSAR_PID_DIR                Folder where the pulsar server PID file should be stored
   PULSAR_STOP_TIMEOUT           Wait time before forcefully kill the pulsar server instance, if the stop is not successful


These variable can also be set in conf/pulsar_env.sh
EOF
}


add_maven_deps_to_classpath() {
    MVN="mvn"
    if [ "$MAVEN_HOME" != "" ]; then
    MVN=${MAVEN_HOME}/bin/mvn
    fi


    # Need to generate classpath from maven pom. This is costly so generate it
    # and cache it. Save the file into our target dir so a mvn clean will get
    # clean it up and force us create a new one.
    f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
    if [ ! -f "${f}" ]
    then
    ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
    fi
    PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}


if [ -d "$PULSAR_HOME/lib" ]; then
PULSAR_CLASSPATH=$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*
    ASPECTJ_AGENT_PATH=`ls -1 $PULSAR_HOME/lib/org.aspectj-aspectjweaver-*.jar`
else
    add_maven_deps_to_classpath


    ASPECTJ_VERSION=`grep '<aspectj.version>' $PULSAR_HOME/pom.xml | awk -F'>' '{print $2}' | awk -F'<' '{print $1}'`
    ASPECTJ_AGENT_PATH="$HOME/.m2/repository/org/aspectj/aspectjweaver/$ASPECTJ_VERSION/aspectjweaver-$ASPECTJ_VERSION.jar"
fi


ASPECTJ_AGENT="-javaagent:$ASPECTJ_AGENT_PATH"


# if no args specified, show usage
if [ $# = 0 ]; then
    pulsar_help;
    exit 1;
fi


# get arguments
COMMAND=$1
shift


if [ -z "$PULSAR_WORKER_CONF" ]; then
    PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
fi


if [ -z "$PULSAR_BROKER_CONF" ]; then
    PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
fi


if [ -z "$PULSAR_BOOKKEEPER_CONF" ]; then
    PULSAR_BOOKKEEPER_CONF=$DEFAULT_BOOKKEEPER_CONF
fi


if [ -z "$PULSAR_ZK_CONF" ]; then
    PULSAR_ZK_CONF=$DEFAULT_ZK_CONF
fi


if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then
    PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
fi


if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then
    PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF
fi


if [ -z "$PULSAR_DISCOVERY_CONF" ]; then
    PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONF
fi


if [ -z "$PULSAR_PROXY_CONF" ]; then
    PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONF
fi


if [ -z "$PULSAR_WEBSOCKET_CONF" ]; then
    PULSAR_WEBSOCKET_CONF=$DEFAULT_WEBSOCKET_CONF
fi


if [ -z "$PULSAR_STANDALONE_CONF" ]; then
    PULSAR_STANDALONE_CONF=$DEFAULT_STANDALONE_CONF
fi


if [ -z "$PULSAR_LOG_CONF" ]; then
    PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
fi


if [ -z "$PULSAR_PRESTO_CONF" ]; then
    PULSAR_PRESTO_CONF=$DEFAULT_PULSAR_PRESTO_CONF
fi


PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"


# Ensure we can read bigger content from ZK. (It might be
# rarely needed when trying to list many z-nodes under a
# directory)
OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"


OPTS="-cp $PULSAR_CLASSPATH $OPTS"


OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"


# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_ROOT_LEVEL=${PULSAR_LOG_ROOT_LEVEL:-"info"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}


#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"


# Functions related logging
OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
# instance
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
OPTS="$OPTS -Dpulsar.module.instanceid=${PULSAR_MODULE_INSTANCE_ID} -Dpulsar.module.type=$COMMAND -Dkafka.cluster=${KAFKA_CLUSTER} -Dpulsar.hostname=${HOSTNAME} -Dpulsar.hostip=${HOST_IP} -Dpulsar.cluster=${PULSAR_CLUSTER} -Dpulsar.topic=${PULSAR_TOPIC}"


ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true"


#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
if [ $COMMAND == "broker" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "bookie" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}
    # Pass BOOKIE_EXTRA_OPTS option defined in pulsar_env.sh
    OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
elif [ $COMMAND == "zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
    exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [ $COMMAND == "configuration-store" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@
elif [ $COMMAND == "discovery" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
elif [ $COMMAND == "proxy" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@
elif [ $COMMAND == "websocket" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
elif [ $COMMAND == "functions-worker" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
elif [ $COMMAND == "standalone" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
    exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [ $COMMAND == "initialize-cluster-metadata" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
elif [ $COMMAND == "delete-cluster-metadata" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [ $COMMAND == "initialize-namespace" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@
elif [ $COMMAND == "zookeeper-shell" ]; then
    exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
elif [ $COMMAND == "broker-tool" ]; then
    exec $JAVA $OPTS org.apache.pulsar.broker.tools.BrokerTool $@
elif [ $COMMAND == "compact-topic" ]; then
    exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "sql" ]; then
    check_presto_libraries
    exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"
elif [ $COMMAND == "sql-worker" ]; then
    check_presto_libraries
    exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
elif [ $COMMAND == "tokens" ]; then
      exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
elif [ $COMMAND == "help" -o $COMMAND == "--help" -o $COMMAND == "-h" ]; then
    pulsar_help;
else
    echo ""
    echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands"
    echo ""
    exit 1
fi

2)pulsar-daemon-kafka 

The script file is copied from the pulsar daemon script file. Based on the pulsar daemon script file, the following modifications are added: (Note: in the following figure, the left side is pulsar daemon and the right side is pulsar daemon Kafka.)

• add read logenv Contents of the project;

• read the contents of pulsar Kafka;

• the complete contents of pulsar daemon Kafka script file are as follows:

#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#


usage() {
    cat <<EOF
Usage: pulsar-daemon (start|stop) <command> <args...>
where command is one of:
    broker              Run a broker server
    bookie              Run a bookie server
    zookeeper           Run a zookeeper server
    configuration-store Run a configuration-store server
    discovery           Run a discovery server
    websocket           Run a websocket proxy server
    functions-worker    Run a functions worker server
    standalone          Run a standalone Pulsar service
    proxy               Run a Proxy Pulsar service


where argument is one of:
    -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown
EOF
}


BINDIR=$(dirname "$0")
PULSAR_HOME=$(cd -P $BINDIR/..;pwd)


# Check bookkeeper env and load bkenv.sh
if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
then
    . "$PULSAR_HOME/conf/bkenv.sh"
fi


if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
    . "$PULSAR_HOME/conf/pulsar_env.sh"
fi


if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
then
    . "$PULSAR_HOME/conf/logenv.sh"
fi

PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RollingFile"}
PULSAR_STOP_TIMEOUT=${PULSAR_STOP_TIMEOUT:-30}
PULSAR_PID_DIR=${PULSAR_PID_DIR:-$PULSAR_HOME/bin}


if [ $# = 0 ]; then
    usage
    exit 1
elif [ $# = 1 ]; then
    if [ $1 == "--help" -o $1 == "-h" ]; then
        usage
        exit 1
    else
        echo "Error: no enough arguments provided."
        usage
        exit 1
    fi
fi


startStop=$1
shift
command=$1
shift


case $command in
    (broker)
        echo "doing $startStop $command ..."
        ;;
    (bookie)
        echo "doing $startStop $command ..."
        ;;
    (zookeeper)
        echo "doing $startStop $command ..."
        ;;
    (global-zookeeper)
        echo "doing $startStop $command ..."
        ;;
    (configuration-store)
        echo "doing $startStop $command ..."
        ;;
    (discovery)
        echo "doing $startStop $command ..."
        ;;
    (websocket)
        echo "doing $startStop $command ..."
        ;;
    (functions-worker)
        echo "doing $startStop $command ..."
        ;;
    (standalone)
        echo "doing $startStop $command ..."
        ;;
    (proxy)
        echo "doing $startStop $command ..."
        ;;
    (*)
        echo "Error: unknown service name $command"
        usage
        exit 1
        ;;
esac


export PULSAR_LOG_DIR=$PULSAR_LOG_DIR
export PULSAR_LOG_APPENDER=$PULSAR_LOG_APPENDER
export PULSAR_LOG_FILE=pulsar-$command-$HOSTNAME.log


pid=$PULSAR_PID_DIR/pulsar-$command.pid
out=$PULSAR_LOG_DIR/pulsar-$command-$HOSTNAME.out
logfile=$PULSAR_LOG_DIR/$PULSAR_LOG_FILE


rotate_out_log ()
{
    log=$1;
    num=5;
    if [ -n "$2" ]; then
       num=$2
    fi
    if [ -f "$log" ]; then # rotate logs
        while [ $num -gt 1 ]; do
            prev=`expr $num - 1`
            [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
            num=$prev
        done
        mv "$log" "$log.$num";
    fi
}


mkdir -p "$PULSAR_LOG_DIR"


case $startStop in
  (start)
    if [ -f $pid ]; then
      if kill -0 `cat $pid` > /dev/null 2>&1; then
        echo $command running as process `cat $pid`.  Stop it first.
        exit 1
      fi
    fi


    rotate_out_log $out
    echo starting $command, logging to $logfile
    echo Note: Set immediateFlush to true in conf/log4j2-kafka.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
    pulsar=$PULSAR_HOME/bin/pulsar-kafka
    nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null &
    echo $! > $pid
    sleep 1; head $out
    sleep 2;
    if ! ps -p $! > /dev/null ; then
      exit 1
    fi
    ;;


  (stop)
    if [ -f $pid ]; then
      TARGET_PID=$(cat $pid)
      if kill -0 $TARGET_PID > /dev/null 2>&1; then
        echo "stopping $command"
        kill $TARGET_PID


        count=0
        location=$PULSAR_LOG_DIR
        while ps -p $TARGET_PID > /dev/null;
         do
          echo "Shutdown is in progress... Please wait..."
          sleep 1
          count=`expr $count + 1`


          if [ "$count" = "$PULSAR_STOP_TIMEOUT" ]; then
                break
          fi
         done


        if [ "$count" != "$PULSAR_STOP_TIMEOUT" ]; then
            echo "Shutdown completed."
        fi


        if kill -0 $TARGET_PID > /dev/null 2>&1; then
              fileName=$location/$command.out
              $JAVA_HOME/bin/jstack $TARGET_PID > $fileName
              echo "Thread dumps are taken for analysis at $fileName"
              if [ "$1" == "-force" ]
              then
                 echo "forcefully stopping $command"
                 kill -9 $TARGET_PID >/dev/null 2>&1
                 echo Successfully stopped the process
              else
                 echo "WARNNING :  $command is not stopped completely."
                 exit 1
              fi
        fi
      else
        echo "no $command to stop"
      fi
      rm $pid
    else
      echo no "$command to stop"
    fi
    ;;


  (*)
    usage
    exit 1
    ;;
esac

3. Add jar s that Kafka Producer depends on

Add the following three jar s to the {PULSAR_HOME}/lib directory on all nodes of the pulsar cluster:

connect-api-2.0.1.jar
disruptor-3.4.2.jar
kafka-clients-2.0.1.jar

4. Start Pulsar service

  1. In order to ensure that the Pulsar service log can be correctly written to Kafka, start it in the foreground through bin / Pulsar Kafka, and then start it in the background through bin / Pulsar daemon Kafka command when there is no exception.
  2. Take starting broker as an example, execute the following command:
bin/pulsar-daemon-kafka start broker
  1. View the broker process through the ps command as follows:

As you can see in the figure above, we use logenv The OPTS configured by SH has been passed to the broker process, log4j2 Kafka The sys tag in yaml can instantiate a Kafka Producer through these attribute values, and the broker process log will be sent to Kafka broker through Kafka Producer.

5. Test whether Pulsar log is successfully written to Kafka broker

Start a Kafka Consumer and subscribe to the Topic of the message sent by log4j2. The message contents read are as follows. Multiple search fields are separated by spaces:

pulsar-cluster dapp21 192.168.0.1 broker 1 2020-12-26 17:40:14.363 [prometheus-stats-43-1] [org.eclipse.jetty.server.RequestLog] INFO - 192.168.0.1 - - [26/Dec/2020:17:40:14 +0800] "GET /metrics/ HTTP/1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus/2.22.1" 4

6. Log retrieval

Open kibana page and search according to the word segmentation field. The search conditions are as follows: Cluster: "pulse cluster" and hostname: "XXX" and module: "broker" and level: "info"

In the above figure, you can see the log retrieval results in a certain time period, and you can add Available fields to the retrieval results as needed. In this way, developers or operation and maintenance personnel can quickly and effectively analyze the causes of Pulsar service exceptions from multiple dimensions through kibana. So far, Apache Pulsar is a complete solution for fast log retrieval based on Log4j2+Kafka+ELK.

summary

At present, distributed and microservice are popular technical directions. In the production system, with the continuous development of business and the rapid expansion of application and service volume, it is a natural choice to transfer from single / vertical architecture to distributed / microservice architecture, which is mainly reflected in reducing complexity, fault tolerance, independent deployment, horizontal scalability and so on. However, it also faces new challenges, such as the efficiency of problem investigation, the convenience of operation and maintenance monitoring, etc. Taking Apache Pulsar as an example, this paper shares how Java processes use Log4j2+Kafka+ELK to realize the rapid retrieval of distributed and micro service logs, so as to achieve the effect of service governance.

Related reading

Pay attention to streamcloud native and discuss the development trend of technologies in various fields with the author 👇

Welcome to contribute

Are you inspired by this article?

Do you have any unique experience to share with community partners and grow together with the community?

The Apache Pulsar community welcomes your contributions. Apache Pulsar and StreamNative hope to provide you with a platform for Pulsar experience and knowledge sharing, and help more community partners have an in-depth understanding of Pulsar. Scan the code and add Bot friends to contact and submit 👇

click link , read the original!

Keywords: Java Apache kafka pulsar Open Source

Added by waq2062 on Thu, 20 Jan 2022 12:45:34 +0200