node Preliminary - - streams Module

flow

In the static file server in the HTTP module chapter, we have seen two examples of writable streams: the server can write data to the response object and to the request object.
Writable stream is a concept widely used in node interface. All Writable streams have a write method to pass strings or buffer objects, and an end method to close the flow. If a parameter is given, the end will output a specified piece of data before the flow closes. Both methods can accept a callback function as a parameter.

But there is a problem in the static file server we built. We read the files in the folder directly by readfire method. No matter the size of the file, it is good for smaller files. But if the file is a movie of several G sizes, the memory space occupied will need several G sizes, and at the end of the file. The memory can not be released before writing to the other end of the network, which is unbearable for our computer.

In fact, for the file, we do not have to complete the file transfer at one time, we can read and write a little bit, bit by bit to complete the file transfer. Of course, it can not be read at once, and the transmission speed may be different. For example, the speed of file reading and the slow writing speed, the computer can read out a certain amount of data in memory to run other tasks after writing. This requires a buffer to improve the efficiency of the operation.

Purpose of flow:

  • You can control memory usage (control memory usage not to exceed a water level)
  • You can decompose large files into small fragments and transfer them bit by bit to reduce memory pressure.
  • Harmonize differences in processing speeds at different stages

Classification of flows:

  • Readable flow
  • Writable stream
  • Duplex flow
  • Converted flow

For example, we want to transmit a movie.

In the past, we may have written as follows:

const fs = require('fs')

var file = 'movie.mp4'

fs.readFile(file, (err, data) => {
  if (err) {
    console.log(err)
  } else {
    fs.writeFile('movie-1', data , () => {
      console.log('done')
    })
  }
})

Writing is very simple, that is to read and write, this process is completed in one time, which means that if the size of the film is 1 G, the memory occupancy is at least 1 G.

What if it's written in streaming mode?

const fs = require('fs')

var file = 'D:/Users/movie.mp4'

var rs = fs.createReadStream(file)
var ws = fs.createWriteStream('D:/Users/movie-1.mp4', {highWaterMark: 65536})
//Here, the highWaterMark can specify the size of the buffer memory. The default size is 65536 bytes, or 64k.

rs.on('data', data => {
  if(ws.write(data) === false) { //Here, the write function returns a Boolean value, and false indicates that the buffer memory is full and cannot be written further.
    rs.pause() //If memory is full, rs pauses
  }
})

ws.on('drain', () => { // ws memory exhaustion
  rs.resume(
  )  // rs reply execution
})

rs.on('end', () => {
  ws.end()
})

If the read speed is faster than the write speed, after the buffer memory is full, rs will stop reading in the middle, and then continue reading after the data in the buffer memory has been written. This is a simple stream.
Note that although the write function tells the memory that it is not writable when it returns false, if it is still written, the memory will still receive the data and will not be thrown away, which of course leads to excessive memory usage.

Now there is a simpler and more common way of writing, which is pipe.

const fs = require('fs')

var file = 'D:/Users/movie.mp4'

var rs = fs.c reateReadStream(file)
var ws = fs.createWriteStream('D:/Users/movie-1.mp4', {highWaterMark: 65536})

rs.pipe(ws) 
// pipe is written in a pipeline way and runs in turn. If there is one, it can be written as follows:
// rs.pipe(gzip).pipe(ws).pipe(conn)

Of course, the internal implementation of pipe is much more complicated than the above code, but the main thing we want to do is just do, when the flow rate of each link is different, we can control the memory occupation by adjusting the flow rate.

Practice

Reading a path file with a readable stream

const { Readable } = require('stream')
const fs = require('fs')

exports.createReadStream = function createReadStream(path) {
  var fd = fs.openSync(path,'r') //Open path path file
  var fileStat = fs.statSync(path)
  var fileSize = fileStat.size
  var position = 0

  return new Readable({
    read(size) {
      var buf = Buffer.alloc(1024) // There are 1024 bytes of space available on buf
      if (position >= fileSize) {
        this.push(null)
        fs.close(fd, (err) => {
          console.log(err)
        })
      } else {
        fs.read(fd, buf, 0, 1024, position, (err, bytesRead) => {
          // From the position of the fd file, read 1024 bytes and place them in the 0th place of buf
          if (err) {
            console.log(err)
          }
          if (bytesRead < 1024) {
            this.push(buf.slice(0, bytesRead))
          } else {
            this.push(buf)
          }
        })
        position += 1024
      }
    }
  })
}

Running code in node

$ node
> cfrs = require('./file-read-stream.js')
> rs = cfrs('./http-server.js')
> rs.on('data', d => console.log(d.toString()))  

In this way, we can read the code of the HTTP module case in the previous section. If we add a write function, we can realize the function of copying files.

exports.createWriteStream = function createWriteStream(path) {
  var fd = fs.openSync(path, 'a+')
  var position = 0

  return new Writable({
    write(chunk, encoding, done) {
      fs.write(fd, chunk, 0, chunk.length, position, () => {
        done()
      })
      position += chunk.length
    }
  })
}

$ node
> const {createReadStream, createWriteStream} = require('./file-read-stream.js')
> createReadStream('./http-server.js').pipe(createWriteStream('./http-server222.js'))

Running this code, a copy of http-server file http-server 222 appears in the folder

Back to yesterday's static file server, we need to change readFile to streaming

const http = require('http')
const fs = require('fs')
const fsp = fs.promises
const path = require('path')
const mime = require('mime')

const port = 8090
const baseDir = __dirname

const server = http.createServer(async (req, res) => {
  var targetPath = decodeURIComponent(path.join(baseDir, req.url))
  console.log(req.method, req.url, baseDir, targetPath)
  try {
    var stat = await fsp.stat(targetPath)
    if (stat.isFile()) {
      try {
        var type = mime.getType(targetPath)
        if (type) {// If the file type is in the mimeMap object, the corresponding decoding method is used.
          res.writeHead(200, {'Content-Type': `${type}; charset=UTF-8`})
        } else { //If not, decode in streaming mode
          res.writeHead(200, {'Content-Type': `application/octet-stream`})
        }
        fs.createReadStream(targetPath).pipe(res)
      } catch(e) {
        res.writeHead(502)
        res.end('502 Internal Server Error')
      }
    } else if (stat.isDirectory()) {
      var indexPath = path.join(targetPath, 'index.html')
      try {
        await fsp.stat(indexPath)
        var type = mime.getType(indexPath)
        if (type) {
          res.writeHead(200, {'Content-Type': `${type}; charset=UTF-8`})
        } else {
          res.writeHead(200, {'Content-Type': `application/octet-stream`})
        }
        fs.createReadStream(indexPath).pipe(res)
      } catch(e) {
        if (!req.url.endsWith('/')) { 
          res.writeHead(301, {
            'Location': req.url + '/'
          })
          res.end()
          return
        }
        var entries = await fsp.readdir(targetPath, {withFileTypes: true})
        res.writeHead(200, {
          'Content-Type': 'text/html; charset=UTF-8'
        })
        res.end(`
          ${
            entries.map(entry => {
              var slash = entry.isDirectory() ? '/' : ''
                return `
                  <div>
                    <a href='${entry.name}${slash}'>${entry.name}${slash}</a>
                  </div>
                `
            }).join('') 
          }
        `)
      }
    }
  } catch(e) {
      res.writeHead(404)
      res.end('404 Not Found')
  }
})

server.listen(port, () => {
  console.log(port)
})

Keywords: node.js network encoding

Added by Htmlwiz on Mon, 07 Oct 2019 05:25:17 +0300