Pipelining with Go

A few days ago, a friend of mine posted a question about the Go concurrency control. The question was, "Can we assure that doSomething(0) always runs before doSomething(2)?" The answer would be, "No." We cannot determine the order in which goroutine can run at a preemption point as in other languages.

var mtx = [2]sync.Mutex{}

func foo() {
 for i := 0; i < 4; i++ {
   go doSomething(i)
 }
}

func doSomething(i int) {
 mtx[i%2].Lock()
 // do something 
 mtx[i%2].Unlock()
}

Though using a mutex for concurrency management is a norm in other languages (e.g., in C++), it is not in Go. Mutex is a mechanism to solve concurrency issues by synchronizing memory access. The Go language FAQ says that "Do not communicate by sharing memory. Instead, share memory by communicating."

If the issue is really we have to maintain the running order of data for every other item, we can achieve this by using the WaitGroup of the sync package.

var (
  wg sync.WaitGroup
)

// For a simplicity, I haven't handled the corner cases:
// It works only when the number of inputs is a multiple of `phase`.
func foo(input []int, phase int) {
  for i := 0; i < len(input); i += phase {
    wg.Add(phase)
    for j := i; j < i + phase; j++ {
      go doSomething(j)
    }
    wg.Wait()
  }
}

func doSomething(i int) {
  defer wg.Done()
  // do something
  fmt.Printf("I'm %d\n", i)
}

However, the issue seemed to handle an infinite input stream while maintaining input dependencies (it can be order or anything else). And, we could guess that it was a data streaming issue, which is the functional programming's turf. I agree that Go lacks functional programming features. However, we can achieve it with the core of the Go language: goroutines and channels.

In "Concurrency in Go: Tools and Techniques for Developers, Katherine Cox-Buday, O'Reilly Media," the author is saying as follows:

Go’s philosophy on concurrency can be summed up like this: aim for simplicity, use channels when possible, and treat goroutines like a free resource.

So, we can think it as a problem to make a pipeline. Let's divide the problem as followings:

  • A Generator to stream the input.
  • Demux the input to multiple streams according to the dependency.
  • Perform doSomething parallelly.
  • Mux the results into a stream.

Input Stream Generator

If you're used to a functional programming language (such as Python..), you might miss the generator (me too!) in Go. But you can create a generator with a channel.

func generator(done <-chan interface{}, input []int) <-chan int {
  var (
    stream = make(chan int)
  )
  go func() {
    defer close(stream)
    for _, v := range input {
      select {
      case <-done:
        return
      case stream <- v:
      }
    }
  }()

  return stream
}

This phase might be redundant, but it is a basic pattern to use goroutines and channels for the streaming.

Demuxing

Then, we split the input stream into multiple streams (channels). We're merely splitting it to the given number of streams, but you can bring your dependency here.

func demux(done <-chan interface{}, 
  numOut int, inStream <-chan int) []chan int {
	
  var (
    streams = make([]chan int, numOut)
  )
  // Do not forget to initialize each channel.
  for i := 0; i < numOut; i++ {
    streams[i] = make(chan int)
  }

  go func() {
    defer func() {
      for _, st := range streams {
        close(st)
      }
    }()

    for i:= 0;;{
      select {
      case <-done:
        return
      case val, ok := <-inStream:
        if !ok {
          return
        }
        streams[i % numOut] <- val
        i++
        if i == numOut {
          i = 0
        }
      }
    }
  }()

  return streams
}

Parallel Execution

We're going to execute each stream in a goroutine. The parallel execution does not guarantee the running order across the channels (though we can maintain the order in a demuxed channel). So, we're going to define a result struct to contain the result information.

type result struct {
  channel int
  input int
  value int
}

func execute(done <-chan interface{}, 
  inputChans []chan int, fn func(int) int) []chan result {
	
  var (
    numInputChans = len(inputChans)
    streams = make([]chan result, numInputChans)
  )
  for i := 0; i < numInputChans; i++ {
    streams[i] = make(chan result)
  }

  var wg sync.WaitGroup
  do := func(chanID int, input <-chan int) {
    defer wg.Done()
    for {
      select {
      case <-done:
        return
      case val, ok := <-input:
        if !ok {
          return
        }
        streams[chanID] <- result{channel: chanID, input: val, value: fn(val)}
      }
    }
  }

  wg.Add(numInputChans)
  for i, c := range inputChans {
    go do(i, c)
  }

  go func() {
    wg.Wait()
    for _, s := range streams {
      close(s)
    }
  }()

  return streams
}

Muxing

Finally, we multiplex the multiple results into a stream.

func mux(done <-chan interface{}, 
  inputChans []chan result) <-chan result {

  var (
    numInputChans = len(inputChans)
    stream = make(chan result)
  )

  var wg sync.WaitGroup
  multiplex := func(c <-chan result) {
    defer wg.Done()
    for v := range c {
      select {
      case <-done:
        return
      case stream <- v:
      }
    }
  }

  wg.Add(numInputChans)
  for _, c := range inputChans {
    go multiplex(c)
  }

  go func() {
    wg.Wait()
    close(stream)
  }()

  return stream
}

Final Result

The final code and a test running result will be as follows.

package main

import (
  "fmt"
  "sync"
)

func generator(done <-chan interface{}, input []int) <-chan int {
  var (
    stream = make(chan int)
  )
  go func() {
    defer close(stream)
    for _, v := range input {
      select {
      case <-done:
        return
      case stream <- v:
      }
    }
  }()

  return stream
}

func demux(done <-chan interface{}, 
  numOut int, inStream <-chan int) []chan int {
	
  var (
    streams = make([]chan int, numOut)
  )
  // Do not forget to initialize each channel.
  for i := 0; i < numOut; i++ {
    streams[i] = make(chan int)
  }

  go func() {
    defer func() {
      for _, st := range streams {
        close(st)
      }
    }()

    for i:= 0;;{
      select {
      case <-done:
        return
      case val, ok := <-inStream:
        if !ok {
          return
        }
        streams[i % numOut] <- val
        i++
        if i == numOut {
          i = 0
        }
      }
    }
  }()

  return streams
}

type result struct {
  channel int
  input int
  value int
}

func execute(done <-chan interface{}, 
  inputChans []chan int, fn func(int) int) []chan result {
	
  var (
    numInputChans = len(inputChans)
    streams = make([]chan result, numInputChans)
  )
  for i := 0; i < numInputChans; i++ {
    streams[i] = make(chan result)
  }

  var wg sync.WaitGroup
  do := func(chanID int, input <-chan int) {
    defer wg.Done()
    for {
      select {
      case <-done:
        return
      case val, ok := <-input:
        if !ok {
          return
        }
        streams[chanID] <- result{channel: chanID, input: val, value: fn(val)}
      }
    }
  }

  wg.Add(numInputChans)
  for i, c := range inputChans {
    go do(i, c)
  }

  go func() {
    wg.Wait()
    for _, s := range streams {
      close(s)
    }
  }()

  return streams
}

func mux(done <-chan interface{}, 
  inputChans []chan result) <-chan result {

  var (
    numInputChans = len(inputChans)
    stream = make(chan result)
  )

  var wg sync.WaitGroup
  multiplex := func(c <-chan result) {
    defer wg.Done()
    for v := range c {
      select {
      case <-done:
        return
      case stream <- v:
      }
    }
  }

  wg.Add(numInputChans)
  for _, c := range inputChans {
    go multiplex(c)
  }

  go func() {
    wg.Wait()
    close(stream)
  }()

  return stream
}

func doSomething(i int) int {
  // do something
  return i*2
}

func main() {
  done := make(chan interface{})
  defer close(done)

  var (
    input = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
  )

  for v := range mux(done, execute(done, demux(done, 2, generator(done, input)), doSomething)) {
    fmt.Printf(">> [%d]: in: %d, out: %d\n", v.channel, v.input, v.value)
  }
  fmt.Printf(">> done\n")
}
>> [0]: in: 0, out: 0
>> [0]: in: 2, out: 4
>> [1]: in: 1, out: 2
>> [1]: in: 3, out: 6
>> [0]: in: 4, out: 8
>> [1]: in: 5, out: 10
>> [0]: in: 6, out: 12
>> [1]: in: 7, out: 14
>> [1]: in: 9, out: 18
>> [0]: in: 8, out: 16
>> done

I assumed that you were already familiar with Go, goroutines, and channels. If you're not, I strongly recommend the book "Concurrency in Go." It is a great resource (though the first two chapters are too verbose and pedantry, but I believe it is worth a read.) for the Go concurrency features. Frankly, this article's implementation got much insight from the book's pipeline and fan-out/fan-in patterns.