Skip to content

Question about 06-Coding-with-Streams Multiplexing #48

@luokuning

Description

@luokuning

Hi there, I'm reading multiplexing in Chapter 6, and I ran into a problem that I can't figure out.
First I copied the contents of the client.js and server.js into my local file, but instead use file read/write stream rather than stdout or stderr, and the code runs smooth without any problem. The complete code is shown below:

// client.mjs
import { connect } from 'net'
import { createReadStream } from 'fs'

function multiplexChannels (sources, destination) {
  let openChannels = sources.length
  for (let i = 0; i < sources.length; i++) {
    sources[i]
    .on('readable', function () { // ①
      let chunk
      while ((chunk = this.read()) !== null) {
        const outBuff = Buffer.alloc(1 + 4 + chunk.length) // ②
        outBuff.writeUInt8(i, 0)
        outBuff.writeUInt32BE(chunk.length, 1)
        chunk.copy(outBuff, 5)
        console.log(`Sending packet to channel: ${i}`)
        destination.write(outBuff) // ③
      }
    })
    .on('end', () => { // ④
      if (--openChannels === 0) {
        destination.end()
      }
    })
  }
}

const socket = connect(3000, '127.0.0.1', () => {
+  const sourceFiles = [
+   createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
+    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
+ ]

  multiplexChannels(sourceFiles, socket)

})
// server.mjs
import { createWriteStream } from 'fs'
import { createServer } from 'net'
import { resolve } from 'path'

function demultiplexChannel (source, destinations) {
  let currentChannel = null
  let currentLength = null

  source
    .on('readable', () => { // ①
      let chunk
      if (currentChannel === null) { // ②
        chunk = source.read(1)
        currentChannel = chunk && chunk.readUInt8(0)
      }

      if (currentLength === null) { // ③
        chunk = source.read(4)
        currentLength = chunk && chunk.readUInt32BE(0)
        if (currentLength === null) {
          return null
        }
      }

      chunk = source.read(currentLength) // ④
      if (chunk === null) {
        return null
      }

      console.log(`Received packet from: ${currentChannel}`)
      destinations[currentChannel].write(chunk) // ⑤
      currentChannel = null
      currentLength = null
    })
    .on('end', () => { // ⑥
      destinations.forEach(destination => destination.end())
      console.log('Source channel closed')
    })
}

const server = createServer(socket => {
+  const destFiles = [
+   createWriteStream('/tmp/filetcp/1.txt'),
+   createWriteStream('/tmp/filetcp/2.txt'),
+  ]

  demultiplexChannel(socket, destFiles)
})
server.listen(3000, () => console.log('Server started'))

But when I add highWaterMark option to file read stream of client.mjs like below:

-const sourceFiles = [
-    createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
-    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
-  ]
+ const sourceFiles = [
+    createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs', { highWaterMark: 8 }),
+    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs', { highWaterMark: 8 }),
+  ]

and run client.mjs again, something strange happened: both client and server socket hang forever. And the content of the server-generated file is incomplete, eg. 1.txt has partial content of client.mjs in this case.

I know that hightWaterMark is the maximum number of bytes of the internal buffer, but I can't figure out this has something to do with the problem, is there something that I missing?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions