java+vue+websocket can obtain and display kubernetes pod logs in real time

scene

Now there is such a scenario: the user will create task running algorithm training on the platform. After starting the task, the background will create a container running training task through kubernetes. You need to display the training task log in real time on the page. Because logs are constantly generated, and the latest logs need to be refreshed automatically in real time on the page, websocket needs to be used.

Take the following pod as an example:

As shown in the figure, the purpose is to obtain the log of this pod in real time and display it on the page, and the log can be refreshed automatically.

Jobname (full name of kubernetes job): train new test

Podname (the full name of the pod generated by the job): train-new-test-zsn9h

fabric8 kubernetes-client

The java client of kubernetes connects to the kubernetes cluster to obtain resources.

<dependency>
	<groupId>io.fabric8</groupId>
	<artifactId>kubernetes-client</artifactId>
	<version>5.10.1</version>
</dependency>
<dependency>
	<groupId>io.fabric8</groupId>
	<artifactId>kubernetes-model</artifactId>
	<version>5.10.1</version>
</dependency>

The basic usage of kubernetes client is not mentioned. In short, when starting in the background, initialize the client object (DefaultKubernetesClient). Any subsequent operation to connect to kubernetes cluster must be operated through this client object. Similar to this creation method:

private DefaultKubernetesClient getClient() {
	Config config = new ConfigBuilder().withMasterUrl("https://xxxxxxxxxxxxxx")
			.withCaCertFile("xxxxxx")
			.withClientCertFile("xxxxxx")
			.withClientKeyFile("xxxxxx")
			.build();
	return new DefaultKubernetesClient(config);
}

Combined with kubernetes client, write a method to obtain container log according to job name.

@Component
public class K8sClientOperator {

    // The K8sClientHolder is implemented by itself. It mainly holds a DefaultKubernetesClient object, which involves kubernetes certificate files and so on
    @Autowired
    private K8sClientHolder k8sClientHolder;

    // Read the log into the stream and give it to websocket
    public void watchLog(String jobName, String namespace, OutputStream outputStream) throws IOException {
        Job job = k8sClientHolder.getClient().batch().v1().jobs().inNamespace(namespace).withName(jobName).get();
        JobStatus jobStatus = job != null ? job.getStatus() : null;
        if (jobStatus == null) {
            outputStream.write("Job non-existent".getBytes());
            return;
        }
        Pod pod = getPodByName(jobName, namespace);
        if (pod == null) {
            outputStream.write("Pod non-existent".getBytes());
            return;
        }
        String podName = pod.getMetadata().getName(); // pod full name

        // Judge whether the job is completed and finish reading 500 lines of logs
        if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) {
            String podLogs = k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                    .tailingLines(500).getLog();
            outputStream.write(podLogs.getBytes());
            return;
        }

        // If the job is not completed, get the corresponding pod log
        String phase = pod.getStatus().getPhase(); // pod gets the current stage
        switch (phase) {
            case "Running" : // Operating state, ideal
                k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                        .tailingLines(100).watchLog(outputStream);
                break;
            case "ContainerCreating" :  // The container is being created, for example, it may be pulling images
                outputStream.write("Container creation in progress".getBytes());
                break;
            case "Completed" : // The container has finished running. Directly pull the last 500 records
                k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                        .tailingLines(500).watchLog(outputStream);
                break;
            // todo has other statuses, which can be handled according to the actual situation, such as Pending
            default :
                k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                        .tailingLines(100).watchLog(outputStream);
                break;
        }
    }

    // Find the corresponding Pod according to the job name.
    // There are other ways, which is to write a linux command,
    // Find the full name of the pod directly according to the jobName, and then execute the linux command through java.
    // However, through the operation of fabric8, I can't find a suitable way. I can only get all pods and compare them according to their names.
    public Pod getPodByName(String jobName, String namespace){
        Pod realPod = null;
        // pod list
        List<Pod> podList = k8sClientHolder.getClient().pods().inNamespace(namespace).list().getItems();
        String podName;
        for (Pod pod : podList) {
            String phase = pod.getStatus().getPhase();
            // Exclude some abnormal pod s
            if (StringUtils.isBlank(phase)
                    || "Terminating".equals(phase)
                    || ("Failed".equals(phase) 
                    && "UnexpectedAdmissionError".equals(pod.getStatus().getReason()))) {
                continue;
            }
            // Because it is a pod created by kubernetes' job, a string of letters will be automatically added to the pod name after the job name and connected with "-", so the following string needs to be removed during comparison
            podName = pod.getMetadata().getName().substring(0, pod.getMetadata().getName().lastIndexOf("-"));
            if (jobName.equals(podName)) {
                realPod = pod;
                break;
            }
        }
        return realPod;
    }

}

java backend websocket

Write the code that the java back end obtains the pod log in real time through websocket and returns it to the front end

Custom OutputStream

Customize an OutputStream to write log contents to websocket.

Need to rewrite   write(byte b[], int off, int len) method, because the watchLog method in kubernetes-client calls this method. If it's an old version   Kubernetes client calls the write(byte b []) method, which needs to be rewritten   write(byte b[]) method. (I encountered this problem when upgrading the kubernetes client version)

public class WebTermOutputStream extends OutputStream {
    private WebSocketSession webSocketSession;

    public WebTermOutputStream(WebSocketSession webSocketSession) {
        this.webSocketSession = webSocketSession;
    }

    @Override
    public void write(byte b[], int off, int len) throws IOException {
        if (b == null || b.length == 0 || len == 0 || ((off < 0) || (off > b.length) || (len < 0) ||
                ((off + len) > b.length) || ((off + len) < 0))) {
            return;
        }
        String response = new String(b, off, len, StandardCharsets.UTF_8);
        webSocketSession.sendMessage(new TextMessage(response));
    }
}

websocket processor

For a custom WebSocketHandler, you need to inherit TextWebSocketHandler and rewrite several main methods

@Component
@Slf4j
public class MyWebSocketHandler extends TextWebSocketHandler {

    @Autowired
    private K8sClientOperator k8sClientOperator;

    private Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();

    // Connection establishment
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        log.info("afterConnectionEstablished, connect success");
    }

    // Receive - > process - > send
    @Override
    protected void handleTextMessage(WebSocketSession webSocketSession, TextMessage message) throws Exception {
        /*JSONObject handleMessage = JSON.parseObject(message.getPayload());
        String taskName = String.valueOf(handleMessage.get("taskName"));
        if (StringUtils.isBlank(taskName)) {
            return;
        }
        String jobName = "train-" + taskName;*/
        String jobName = "train-new-test"; // It's easy to write the jobName here
        k8sClientOperator.watchLog(jobName, "training", new WebTermOutputStream(webSocketSession));
        sessionMap.put(webSocketSession.getId(), webSocketSession);
    }

    // Connection exception
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        if (session.isOpen()) {
            log.info("session [" + session.getId() + "] is open, need close");
            try {
                session.close();
            } catch (IOException e) {
                log.error("session [" + session.getId() + "] close failed", e);
            }
        }
        sessionMap.remove(session.getId());
        log.error(String.format("websocket error, seesion[id:%s], \t %s",
                session.getId(), JSONObject.toJSONString(session.getAttributes())), exception);
    }

    // Connection closed
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        log.warn(String.format("websocket closed, seesion[id:%s], \t %s",
                session.getId(), JSONObject.toJSONString(session.getAttributes())), status);
        sessionMap.remove(session.getId());
    }
}

Back end websocket entry

Receive "/ webterm" request

@Configuration
@EnableWebSocket
public class WebTermSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
	    // Receive "/ webterm" request
        registry.addHandler(webSocketHandler(), "/webterm").setAllowedOrigins("*");
    }

    @Bean
    public WebTermSocketHandler webSocketHandler() {
        return new MyWebSocketHandler();
    }
}

vue front end websocket

The front end initiates a websocket connection request, and continuously receives the data returned by the back end and displays it on the page.

Here I add a logCache object to temporarily save the data returned by websocket. The main reason is that the task of running is relatively special, and the log generation speed is very fast. If it is directly refreshed on the page, the browser will collapse. So cache first, and then create a timer to refresh the data to the page every three seconds.

Only some key codes are posted:

data () {
	return {
		log: '', // Log content
		// The main reason for log caching is that my task logs are refreshed too quickly
		// If you refresh in real time, the page will crash directly!
		// Therefore, a cache is added. The data obtained from websocket is put into logCache first
		// Then start the timer, add the logCache data to the content displayed on the page every 3 seconds, and then empty the logCache!
		// If the log refresh is not too fast, it won't be so troublesome
		logCache: '', 
		websocket: null,
		socketTimer: null,
	}
},
mounted () {
	this.createWebsocket()
},
methods: {
  createWebsocket () {
		const url = 'wss://xxx.com:port/webterm'
		this.websocket = new WebSocket(url)
		this.websocket.onopen = () => {
			console.log("websocket.onopen...")
			this.websocket.send(JSON.stringify({ taskName: 'xxxxxx' }))
			this.openSocketTimer()
		}
		this.websocket.onmessage = (res) => {
			console.log("websocket.onmessage...")
			const data = res.data
			let lineRes = ''
			let arrN = []
			if (data.indexOf('\n') !== -1) {
				arrN = data.split('\n')
			}

			if (arrN.length > 0) {
				for (let i = 0; i < arrN.length - 1; i++) {
					let lineStr = arrN[i]
					if (lineStr.indexOf('\r') !== -1) {
						lineStr = lineStr[lineStr.length - 1]
					}

					lineRes += lineStr + '\n\n'
				}
			} else {
				lineRes = data
			}
			// Put into logCache
			if (res.data) {
				if(!this.log || this.log.length == 0) {
					this.$nextTick(() => {
						this.log = `${this.log}${lineRes}`
					})
				} else {
					this.logCache = `${this.logCache}${lineRes}`
				}
			}
		}
		this.websocket.onclose = function (e) {
			console.log("websocket.onclose...")
			console.log(`connection dropped...>>>${e.code}`)
		}
	},
	// Start the timer and check whether the log needs to be refreshed every 3 seconds
	openSocketTimer () {
		this.clearSocketTimer()
		this.socketTimer = setInterval(this.checkRenderEnable, 3000)
	},
	// off timer 
	clearSocketTimer () {
		if (this.socketTimer != null) {
			clearInterval(this.socketTimer)
			this.socketTimer = null
		}
	},
	// Judge whether the page needs to be refreshed, that is, whether the log cache is empty
	checkRenderEnable () {
		if(this.logCache && this.logCache.length > 0) {
			this.renderPage()
		}
	},
	// Refresh page
	renderPage () {
		this.$nextTick(() => {
			this.log = `${this.log}${this.logCache}`
			this.logCache = '' // After the refresh is completed, empty the log cache
		})
	},
}

Keywords: Java Kubernetes websocket

Added by kid85 on Wed, 08 Dec 2021 09:18:17 +0200