Golang Pipelines

Awful ways to create Pipelines with Go

This is a common pattern you’ll see in Go codebases.

// audio managament utils

func OverlayMusic(speech io.Reader, music io.ReadSeeker, out io.Writer) (time.Duration, error) { ... }

func Concat(in []io.Reader, out io.Writer) (time.Duration, error) { ... }

func Fade(in io.Reader, dur time.Duration, out io.Writer) (time.Duration, error) { ... }

func Silence(dur time.Duration, out io.Writer) (time.Duration, error) { ... }

At first glance, everything seems alright. Like a normal pipeline, given an io.Reader and io.Writer, copies information from io.Reader, transforms it, and passes it to io.Writer.

However, these style doesn’t scale well, excluding niche cases like `io.Copy` and the such.

These functions don’t fufill the io.Reader interface themselves, and often rely on a io.Pipe constructed (outsourced) to the consumer.

For example, let’s say we wanted to pad the beginning & end of a voice, and overlay the entire thing with music track.

func CreateSoundtrack(speech io.Reader, music io.ReadSeeker) io.Reader {
	// 2 second silence for the beginning
	padStartR, padStartW := io.Pipe()
	go func() {
		Silence(2*time.Second, padStartW)
	}()

	// 2 second silence for the end
	padEndR, padEndW := io.Pipe()
	go func() {
		Silence(2*time.Second, padEndW)
	}()

	// concat {silence, speech, silence}
	concattedR, concattedW := io.Pipe()
	go func() {
		Concat([]io.Reader{padStartR, speech, padEndR}, concattedW)
	}()

	// overlay concatted speech with music
	overlayR, overlayW := io.Pipe()
	go func() {
		OverlayMusic(concattedR, music, overlayW)
	}()

	// fade the overlay
	fadedR, fadedW := io.Pipe()
	go func() {
		Fade(overlayR, 2*time.Second, fadedW)
	}()

	return fadedR
}

Worse yet, a failure can occur anywhere along this pipeline, thus we must handle it and close associated readers/writers.

func CreateSoundtrack(speech io.Reader, music io.ReadSeeker) io.Reader {
	// 2 second silence for the beginning
	padStartR, padStartW := io.Pipe()
	go func() {
		if _, err := Silence(2*time.Second, padStartW); err != nil {
			padStartW.CloseWithError(err)
		}
	}()

	// 2 second silence for the end
	padEndR, padEndW := io.Pipe()
	go func() {
		if _, err := Silence(2*time.Second, padEndW); err != nil {
			padEndW.CloseWithError(err)
		}
	}()

	// concat {silence, speech, silence}
	concattedR, concattedW := io.Pipe()
	go func() {
		if _, err := Concat([]io.Reader{padStartR, speech, padEndR}, concattedW); err != nil {
			concattedW.CloseWithError(err)
		}
	}()

	// overlay concatted speech with music
	overlayR, overlayW := io.Pipe()
	go func() {
		if _, err := Overlay(concattedR, music, overlayW); err != nil {
			overlayW.CloseWithError(err)
		}
	}()

	// fade the overlay
	fadedR, fadedW := io.Pipe()
	go func() {
		if _, err := Fade(overlayR, 2*time.Second, fadedW); err != nil {
			fadedW.CloseWithError(err)
		}
	}()

	return fadedR
}

Worse yet, every step along this pipeline must be alerted and close (otherwise, everything blocks, forever). Of course, we can decide that Silence, Concat, Overlay, and Fade must accept io.WriteCloser, rather than io.Writer, and thus pass the responsibility of closing to them.

However, this would violate the principles of responsibility. It’s not the responsibility of these functions to close out, especially when their termination is signified when they return a duration and/or an error.

Thus we finally end up with:

func CreateSoundtrack(speech io.Reader, music io.ReadSeeker) io.Reader {
	// 2 second silence for the beginning
	padStartR, padStartW := io.Pipe()
	go func() {
		if _, err := Silence(2*time.Second, padStartW); err != nil {
			padStartW.CloseWithError(err)
		} else {
			padStartW.Close()
		}
	}()

	// 2 second silence for the end
	padEndR, padEndW := io.Pipe()
	go func() {
		if _, err := Silence(2*time.Second, padEndW); err != nil {
			padEndW.CloseWithError(err)
		} else {
			padEndW.Close()
		}
	}()

	// concat {silence, speech, silence}
	concattedR, concattedW := io.Pipe()
	go func() {
		if _, err := Concat([]io.Reader{padStartR, speech, padEndR}, concattedW); err != nil {
			concattedW.CloseWithError(err)
		} else {
			concattedW.Close()
		}
	}()

	// overlay concatted speech with music
	overlayR, overlayW := io.Pipe()
	go func() {
		if _, err := Overlay(concattedR, music, overlayW); err != nil {
			overlayW.CloseWithError(err)
		} else {
			overlayW.Close()
		}
	}()

	// fade the overlay
	fadedR, fadedW := io.Pipe()
	go func() {
		if _, err := Fade(overlayR, 2*time.Second, fadedW); err != nil {
			fadedW.CloseWithError(err)
		} else {
			fadedW.Close()
		}
	}()

	return fadedR
}

Why is this so common?

Reason #1: Misunderstanding Go

Code like this is often caused by a misunderstanding of the Go-way of handling pipelines.

For example, we are using Pipe as the natural consequence of maintaining synchronization of reader/writer goroutines.

However, Go already handles that for us. When we call Read on io.Reader, that Read operation can block the goroutine until it has data.

In addition, we are closing these pipes manually, although Go already has io.EOF. Moreover, Go already offers idiom to handle errors in readers.

Reason #2: Stateful Read operations

Many times, a Read(p []byte) operation must store mutable state that will change across the lifetime of the reader. This is especially in cases where, say, you’re running compression/decompression.

type CustomReader struct {
	flag1      bool
	flag2      bool
	flag2Value byte
	flag3      bool
}

func (c *CustomReader) Read(p []byte) (int, error) {
	if c.flag1 {
		// mutate state
	} else if c.flag3 {
		// mutate other state
	}

	if c.flag3 {
		p[0] = c.flag2Value
		return 1, nil
	}

	return 0, io.EOF
}

Because Read doesn’t maintain state in some goroutine, it must set flags and associated values in the struct. In addition, Read doesn’t know where along it’s lifecycle it is.

This is made even harder by the fact that you must respect the length of the buffer provided by the caller. This is one of the biggest reasons people opt for io.Pipe.

What can we do about it?

These are the kind of issues I deal with all that time, so if you need a little assistance, don’t be afraid to reach out to me, at support@poxate.com. You can also contact me through Bluesky or Twitter.

The key to solving these issues lies in embracing Go’s idioms for stream processing and properly designing abstractions around io.Reader and io.Writer.

Instead of relying on the user to handle connections via io.Pipe correctly, we return a simple io.Writer, enclosed with information.

In addition (as much as this is possible in your workspace), wrap io.Reader in a special standardized struct that all related functions adhere to.

In our case, that’s Duration.

type AudioReader struct {
  // The final value of Duration is determined when producer finishes without error
	Duration time.Duration
	io.Reader
}

Now we change the signature of each function to adhere to return an AudioReader.

// func OverlayMusic(speech io.Reader, music io.ReadSeeker, out io.Writer) (time.Duration, error) { ... }
func OverlayMusic(speech io.Reader, music io.ReadSeeker) *AudioReader { }

// func Fade(in io.Reader, dur time.Duration, out io.Writer) (time.Duration, error) { ... }
func Fade(in io.Reader, dur time.Duration) *AudioReader { }

// func Concat(in []io.Reader, out io.Writer) (time.Duration, error) { ... }
func Concat(in ...io.Reader) *AudioReader { }

// func Silence(dur time.Duration, out io.Writer) AudioReader { ... }
func Silence(dur time.Duration) *AudioReader { }

Here’s how the AudioReader generator functions:

func NewAudioReader(producer func(w io.Writer) (time.Duration, error)) *AudioReader {
	pipeR, pipeW := io.Pipe()

	r := &AudioReader{
		Reader: pipeR,
	}

	go func() {
		dur, err := producer(pipeW)
		if err != nil {
			pipeW.CloseWithError(err)
		} else {
			pipeW.Close()
			r.Duration = dur
		}
	}()

	return r
}

Let’s break this down step by step:

Step #1: Connect a Pipe

This may seem a little counter intuitive at first. Aren’t we trying to get rid of io.Pipe?

Yes, however, we’re only removing it from userland. It’s still a powerful abstraction that has its place.

func NewAudioReader() *AudioReader {
	pipeR, pipeW := io.Pipe()

	r := &AudioReader{
		Reader: pipeR,
	}

	...

	return r
}

Step #2: Create a factory that accepts the PipeWriter

We want a simple way to create stateful writer goroutines, so let’s accept a function that accepts an io.Writer.

func NewAudioReader(producer func(w io.Writer)) *AudioReader {
	pipeR, pipeW := io.Pipe()

	r := &AudioReader{
		Reader: pipeR,
	}

	go producer(pipeW)

	return r
}

Here’s what a function taking advantage of this abstraction looks like:

func Silence(dur time.Duration) *AudioReader {
	return NewAudioReader(func(w io.Writer) {
		for range int(dur.Seconds()) {
			// warning, this is a fake implementation of silence
			p := make([]byte, 1024)
			
			// w is a *io.PipeWriter, so Write blocks
			_, err := w.Write(p)
			if err != nil {
				return
			}
		}
	})
}

Another example using Concat:

func Concat(in ...io.Reader) *AudioReader {
	return NewAudioReader(func(w io.Writer) {
		for _, r := range in {
			_, err := io.Copy(w, r)
			if err != nil {
				return
			}
		}
	})
}

Warning: as of right now, we are not handling Duration, errors, nor are we closing the internal io.Pipe created in NewAudioReader. The following code is meant to show the final API we’re heading for.

Preview: How we compose this code

Looking back on our CreateSoundtrack function, our code now composes intiuivtely.

func CreateSoundtrack(speech io.Reader, music io.ReadSeeker) io.Reader {
	padStart := Silence(2 * time.Second)
	padEnd := Silence(2 * time.Second)

	concatted := Concat(padStart, speech, padEnd)

	overlay := Overlay(concatted, music)

	faded := Fade(overlay, 2*time.Second)

	return faded
}

That’s a reduction from 53 lines to just 12!

And say, for example, we need to know the duration of the final result, we can wait for faded to complete with EOF, then pull the final duration out.

func CreateSoundtrack(speech io.Reader, music io.ReadSeeker, out *os.File) time.Duration {
	padStart := Silence(2 * time.Second)
	padEnd := Silence(2 * time.Second)

	concatted := Concat(padStart, speech, padEnd)

	overlay := Overlay(concatted, music)

	faded := Fade(overlay, 2*time.Second)

	if _, err := io.Copy(out, faded); err != nil {
		// error handling
	}

	return faded.Duration
}

Step #3: Complete the abstraction

You may have wondered where the duration even comes from, given that that the Silence and Concat made no mention of it.

Truth is, we simply have not implemented it yet. In addition, let’s provide a way for AudioReader generators to return an error, such that it terminates the reader.

// func NewAudioReader(producer func(w io.Writer)) *AudioReader {
func NewAudioReader(producer func(w io.Writer) (time.Duration, error)) *AudioReader {
	pipeR, pipeW := io.Pipe()

	r := &AudioReader{
		Reader: pipeR,
	}
	
	// go producer()
	go func() {
		dur, err := producer(pipeW)
		if err != nil {
			pipeW.CloseWithError(err)
		} else {
			pipeW.Close()
			r.Duration = dur
		}
	}()

	return r
}

Now our Silence generator needs simply to return a duration, or an error if it fails along the way.

func Silence(dur time.Duration) *AudioReader {
	return NewAudioReader(func(w io.Writer) (time.Duration, error) {
		for range int(dur.Seconds()) {
			// warning, this is a fake implementation of silence
			p := make([]byte, 1024)
			_, err := w.Write(p)
			if err != nil {
				return 0, err
			}
		}

		return dur, nil
	})
}

Our Concat can take a slight change in its signature, accepting AudioReader(s) rather than io.Reader. This enables it to add together the durations.

func Concat(in ...*AudioReader) *AudioReader {
	return NewAudioReader(func(w io.Writer) (time.Duration, error) {
		finalDuration := time.Duration(0)

		for _, audio := range in {
			if _, err := io.Copy(w, audio); err != nil {
				return 0, err
			}

			finalDuration += audio.Duration
		}

		return finalDuration, nil
	})
}

Key takeaways

This may have been a bit overwhelming. Fortunately, I’ve used this pattern many times in the past. Contact me at support@poxate.com or @ me on Twitter or Bluesky.

  • Create a function in the style of func(in io.Reader) io.Reader, not func(in io.Reader, out io.Writer).
  • If a generator needs to return extra data, create a custom struct.
  • If a generator handles complex and yet serial state, create a goroutine connected to io.Pipe.
  • If you’re developing a package of generators (say audio library with Silence, Overlay, Fade, etc), create a consistent interface among them.

This is the final result:

func CreateSoundtrack(speech io.Reader, music io.ReadSeeker) *AudioReader {
	padStart := Silence(2 * time.Second)
	padEnd := Silence(2 * time.Second)

	concatted := Concat(padStart, speech, padEnd)

	overlay := Overlay(concatted, music)

	faded := Fade(overlay, 2*time.Second)

	return faded
}

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top