6 - click stream data analysis project - log collection to HDFS

6 - click stream data analysis project - log collection to HDFS

reference resources:

The basic information about logs has been introduced in the previous section. It will not be explained in detail here. Only the basic methods of generating logs and collecting logs are provided.

1, shell collection log

1. Generate logs based on Java:

To create a pom file:


        <!-- <version>1.2.3</version> -->


                <!-- Can play fat and thin jar-->


Log generation:

package edu.sx.loggen;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Date;
public class GenerateLog {
    public static void main(String[] args) throws InterruptedException {
        Logger logger = LogManager.getLogger("testlog");
        int i = 0;
        while (true){
            logger.info(new Date().toString()+"----------------------");
//            Thread.sleep(1);
            if(i > 1000000){

Create log4j properties

log4j.appender.testlog = org.apache.log4j.RollingFileAppender
log4j.appender.testlog.layout = org.apache.log4j.PatternLayout
log4j.appender.testlog.layout.ConversionPattern = [%-5p][%-22d{yyyy/MM/dd HH:mm:ssS}][%l]%n%m%n
log4j.appender.testlog.Threshold = INFO
log4j.appender.testlog.ImmediateFlush = TRUE
log4j.appender.testlog.Append = TRUE
#The path to which the log is printed. Note the path here
log4j.appender.testlog.File = /sx/logs/log/access.log
log4j.appender.testlog.MaxFileSize = 1000KB
log4j.appender.testlog.MaxBackupIndex = 20
log4j.appender.testlog.Encoding = UTF-8

It can be packaged or run directly

2.shell script collection log

Create script


#set java env
export JAVA_HOME=/export/server/jdk/jdk
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

#set hadoop env
export HADOOP_HOME=/export/server/hadoop
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

#Problems solved
#       1. First move the files to be uploaded to the directory to be uploaded
#       2. When moving the file to the directory to be uploaded, rename the file according to a certain format
#               /export/software/hadoop.log1   /export/data/click_log/xxxxx_click_log_{date}

#Directory where log files are stored

#Directory of files to be uploaded

#The root path to upload log files to hdfs

#Print environment variable information
echo "envs: hadoop_home: $HADOOP_HOME"

#Read the directory of log files and judge whether there are files to upload
echo "log_src_dir:"$log_src_dir
ls $log_src_dir | while read fileName
        if [[ "$fileName" == access.log.* ]]; then
        # if [ "access.log" = "$fileName" ];then
                date=`date +%Y_%m_%d_%H_%M_%S`
                #Move the file to the directory to be uploaded and rename it
                #Print information
                echo "moving $log_src_dir$fileName to $log_toupload_dir"xxxxx_click_log_$fileName"$date"
                mv $log_src_dir$fileName $log_toupload_dir"xxxxx_click_log_$fileName""_"$date
                #Write the path of the file to be uploaded to a list file willDoing
                echo $log_toupload_dir"xxxxx_click_log_$fileName""_"$date >> $log_toupload_dir"willDoing."$date

#Locate the list file willDoing
ls $log_toupload_dir | grep will |grep -v "_COPY_" | grep -v "_DONE_" | while read line
        #Print information
        echo "toupload is in file:"$line
        #Change the file list to be uploaded to willDoing_COPY_
        mv $log_toupload_dir$line $log_toupload_dir$line"_COPY_"
        #Reading list file willDoing_COPY_ The content (file names to be uploaded one by one), and the line here is the path of a file to be uploaded in the list
        cat $log_toupload_dir$line"_COPY_" |while read line
                #Print information
                echo "puting...$line to hdfs path.....$hdfs_root_dir"
                hadoop fs -put $line $hdfs_root_dir
        mv $log_toupload_dir$line"_COPY_"  $log_toupload_dir$line"_DONE_"

Pre created:
1. Directory where log files are stored

2. Directory of files to be uploaded

3. The root path to upload log files to hdfs

3. Start step acquisition regularly

It is impossible to collect logs only once, so time scheduling is required, such as collecting logs every other minute.
Regular collection with crontab:
crontab syntax:

crontab [-u username] [-l|-e|-r]

Options and parameters:
-u: Only root can perform this task, that is, help other users create / remove crontab work schedules;
-e: Edit the work content of crontab
-l: Check the work content of crontab
-r: Remove all crontab work contents. To remove only one item, use - e to edit it

Query the current crontab content of the user:

crontab -l

#Collect every five minutes

*/5 * * * *  /root/collectdata.sh

Clear the current crontab of the user:

crontab -r
crontab -l
no crontab for blue

If you want to delete a crontab task of the current user, use crontab -e to enter the editor, and then delete the corresponding task.

Reprinted to: https://blog.csdn.net/tianjun2012/article/details/62424486

2, Method 2 - flume collection log

  • Create a new configuration file taildirsource HDFS
[hadoop@hadoop-001 lib]$ cd ../
[hadoop@hadoop-001 flume]$ touch  job/TaildirSource-hdfs.conf

The contents of taildirsource HDFS configuration file in are:

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
#Monitor the newly added contents of multiple files in a directory
agent1.sources.source1.type = TAILDIR
#Save the offset consumed by each file in json format to avoid consumption from scratch
agent1.sources.source1.positionFile = /home/hadoop/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /home/hadoop/nginx/logs/access_*.log

agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
#Configure sink component as hdfs
agent1.sinks.sink1.type = hdfs
#Specify file name prefix
agent1.sinks.sink1.hdfs.filePrefix = access_log
#Specify the number of records for each batch of sinking data
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#Specifies that the sinking file is scrolled by 1MB
agent1.sinks.sink1.hdfs.rollSize = 1048576
#Specify the number of files to scroll by 1000000
agent1.sinks.sink1.hdfs.rollCount = 1000000
#Specify the file to scroll by 30 minutes
agent1.sinks.sink1.hdfs.rollInterval = 30
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

#Using memory type channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

Start flume

  • This command can only be executed in the flume directory
[hadoop@hadoop-001 /]$ cd /home/hadoop/bigdatasoftware/flume
[hadoop@hadoop-001 flume]$ bin/flume-ng agent --conf conf/ --name agent1 --conf-file job/TaildirSource-hdfs.conf -Dflume.root.logger=INFO,console

From Hadoop flume HDFS hive master project on github


This paper mainly realizes the collection of logs. Since the relevant contents have been introduced in detail, only some implementation methods are listed here, and no specific details are introduced

Keywords: Hadoop Data Analysis hdfs

Added by ron8000 on Thu, 30 Dec 2021 07:23:26 +0200