Concurrency In Go

Concurrency in Go

A brief introduction to Go

// Every Go program is made up of packages.
// Programs start running in package main.
// Main refers to "executable". Others are libraries.
package main

// Declares libraries referenced in this source file
import (
    "fmt"       // The fmt package (shorthand for format)
                // implements formatting for input and output.
)

// main is special. it's the function that gets called program is executed.
func main() {
    // Println outputs a line to stdout.
    fmt.Println("Bello world!")

    // function call
    movingforward()
}

func movingforward() {
    var x int // Variable declaration.
    x = 10     // Variable assignment.
    y := 20    // "Short" declarations.
              // Go compiler is able to infer the type based on the literal
              // value assigned to the variable.
    sum, prod := returnTwoValues(x, y)        // Function returns two values.
    fmt.Println("sum:", sum, "\nprod:", prod) // Print.
    learnArrays()
}

/*  Functions can have parameters and multiple return values.
    Here `sum`, `prod` are the signature of what is returned. */
func returnTwoValues(x, y int) (sum, prod int) {
    return x + y, x * y // Return two values.
}

// Some built-in types and literals.
func learnArrays() {
    // Arrays have size fixed at compile time.
    var arr1 [5]int             // An array of 5 ints initialized to all 0.
    arr2 := [...]int{6,1,9}     // An array initialized with a fixed size of
                                // three elements

    fmt.Println(arr1)
    fmt.Println(arr2)
    // Slices have dynamic size.
    slc1 := []int{1,2,3}    // Compare to arr2. No ellipsis here.
    slc2 := make([]int, 5)    // Allocates slice of 5 ints, initialized to all 0.

    // Slices are dynamic.
    // To append elements to a slice, built-in append() function is used.
    slc1 = append(slc1, 4, 5, 6)   // Added 3 elements.
    fmt.Println(slc1)           // Updated slice is now [1 2 3 4 5 6]
    fmt.Println(slc2)
    // To append another slice to it.
    slc1 = append(slc1, []int{7, 8, 9}...) // Second argument is a slice literal.
    fmt.Println(slc1)           // Updated slice is now [1 2 3 4 5 6 7 8 9]

    p, q := learnPointers() // Declares p, q to be type pointer to int.
    fmt.Println(*p, *q)

    // Maps are a dynamically growable associative array type
    // (like the hash or dictionary types of Python or Java)
    m := map[string]int{"two": 2, "seven": 7}
    m["one"] = 1
    a:=10
    b:=20
    fmt.Println(learnNamedReturns(a,b))
    basicFlowControl() // Back in the flow.
}

// Assigning a name to the type being returned in the function declaration line
// allows to easily return from multiple points in a function
func learnNamedReturns(x, y int) (z int) {
    z = x * y
    return // z is implicit here
}

// Go is fully garbage collected.
// Pointers but no pointer arithmetic.
func learnPointers() (p, q *int) {
    p = new(int)         // new allocates memory.
    // The allocated int is initialized to 0, p is no longer nil.
    s := make([]int, 10) // Allocate 10 ints as a single block of memory.
    s[5] = 24
    r := -44
    return &s[5], &r     // returns two int values.
}

func basicFlowControl() {
    if true {
        fmt.Println("Banana.")
    }
    // Formatting is standardized by the command line command "go fmt."
    if false {
        // Nothing
    } else {
        // Nothing
    }
    // Switch is preffered over chained if statements
    x := 41193.0
    switch x {
    case 0:
    case 1:
    case 41193:
          fmt.Println("Gelato.")
    case 41194:
        // Unreached.
    default:
        // Optional
    }
    // For is the only loop statement in Go
    // Has many alternate forms
    for {        // It is an Infinite loop.
        break    // We are saved by using "break"
        continue // Unreached
    }

    // Use range to iterate over an array, a slice, a string, a map, or a channel.
    // range returns one (channel) or two values (array, slice, string and map).
    for key, value := range map[string]int{"one": 1, "two": 2, "three": 3} {
        // for each pair in the map, print key and value
        fmt.Printf("key=%s, value=%d\n", key, value)
    }

    useDefer()          // Inportant.
    useInterfaces()   // Good stuff coming up!
}

func useDefer() (ok bool) {
    // Deferred statements are executed just before the function returns.
    defer fmt.Println("(SECOND) Deferred statements are executed in reverse.")
    defer fmt.Println("\n(FIRST) Printed first because (LIFO) or ")
    return true
}

// Define infString as interface.
type infString interface {
    String() string
}

// Define stcInt as struct.
type stcInt struct {
    x, y int
}

// Define a method on type stcInt.
// stcInt implements infString.
func (p stcInt) String() string {
    // p is called the "receiver"
    return fmt.Sprintf("(%d, %d)", p.x, p.y)
}

func useInterfaces() {
    // p is initialized with values 5 and 6.
    p := stcInt{5, 6}
    fmt.Println(p.String())   // Call String method of p, of type stcInt.
    var i infString           // Declare i of interface type infString.
    i = p                     // Valid because stcInt implements infString
    fmt.Println(i.String())
    fmt.Println(p) // Output same as above.
    fmt.Println(i) // Output same as above.

    useVariadicParams("Kevin", "KingBob", "Dave")
}

// Function with variadic parameters.
func useVariadicParams(myStrings ...interface{}) {
    // Iterate each value of the variadic.
    // The underscore ignores the index argument of the array.
    for _, param := range myStrings {
        fmt.Println("Minion:", param)
    }

    // Pass variadic value as a variadic parameter.
    fmt.Println("All Minions:", fmt.Sprintln(myStrings...))
}

What is concurrency?

Concurrency is one of the way to structure software that interacts well with the real world. Concurrency is the composition of the independently executing processes that potentially interact with each other.

Concurrency is not parallelism

Parallelism is the simultaneous execution of processes which may or may not be related to each other. They exploit the multi core architecture systems that we have today. So, in parallelism we perform multiple actions at once. Parallelism is to reduce data dependencies in order to be able to perform computations on independent computation units with minimal communication between them.

On a single core processor, the program can still be concurrent and not parallel, nonetheless concurrency do get benefited from multiple processing units with respect to speed. It is not parallelism, although using concurrency we do enable parallelism.

How concurrency is implemented in GO?

Go provides concurrency in language and runtime, not by through a library. Go follows the ideology "Do not communicate by sharing memory; instead, share memory by communicating."

In traditional threading models we communicate between threads using shared memory. Shared data structures are protected by locks, and these threads will contend over those locks to access the data. Go encourages that shared values should pass around on channels and are never actively shared by separate threads of execution.

Concurrency in Go is implemented using goroutines. Goroutines are independently executing functions. It is a function executing in parallel with other goroutines in the same address space. It is lightweight and has own call stack. The stacks start small, so they are cheap, and grow (or shrinks) by allocating (and freeing) heap storage as required. It is even practical to have one thread with thousands of goroutines

A goroutine is a normal function and it is preceded with the keyword go.

package main

import (
  "fmt"
  "time"
)

// Create a function "ready"
// It takes a string and an integer, waits for the time given and prints it.
func ready(fodr string, sec int) {
  time.Sleep(time.Duration(sec) * time.Second)
  fmt.Println(fodr, "is ready!")
}

func main() {
  // This creates first goroutine with wait time 2 seconds
  go ready("Toast", 2)

  // This creates second goroutine with wait time 1 second
  go ready("Coffee", 1)

  fmt.Println("Have to wait!")
  // This is needed.
  // If we do not wait for the goroutines then the program would be terminated
  // immediately killing off the running goroutines
  time.Sleep(5 * time.Second)
}

Channels

In the last lines of the previous piece of code we need to manually wait for goroutines to end otherwise program would have terminated immediately and any goroutines running would have died with it. To prevent this there is a mechanism which allows communication between goroutines. This is called a channel.

A channel is something similar to Linux's Pipe (two-way): it is used to send and receive values from it. The values should only be the type of the channel we are sending to or receiving from. It is a way to synchronize the execution of goroutines.

package main

import (
    "fmt"
    "time"
)

// c is declared
// it is a variable that is a channel of integers.
// "<-" This defines the direction of data flow.
// (sending or receiving)
var c chan int

func ready(fodr string, sec int) {
    time.Sleep(time.Duration(sec) * time.Second)
    fmt.Println(fodr, "is ready!")

  // Integer 1 is sent on the channel c
  c<-1
}

func main() {
  // c is initialized here.
    c = make(chan int)

  // This creates first goroutine with wait time 2 seconds
    go ready("Toast", 2)

  // This creates second goroutine with wait time 1 second  
    go ready("Coffee", 1)
    fmt.Println("I don't have to wait for a long time now!")

  // This will wait until a value is received from channel c.
  // (We are discarding the value we received)
  <-c
  // Second value from the second goroutine (again we discarded it)
  <-c
}

Here goroutines are running concurrently, but they are not running in parallel. When not told specifically, there is only one goroutine running at a time.

Using GOMAXPROCS we set the maximum number of CPUs. With runtime.GOMAXPROCS(n) we set the number of goroutines that can run in parallel.

Buffered Channels

In the previous code we initialized channel using:

c = make(chan int)

This is a synchronous channel, that means both sides will have to wait until the other side is ready. This is avoided by making a buffered channel which is asynchronous.

c = make(chan int, 5)

Sending or receiving will not wait until the channel is already full. Here we have set the capacity of the buffered channel to 5. The first 5 elements in this channel are written without any blocking. When the 6th element is ready to be written, the code will block till another goroutine reads some elements from the channel to free the buffer.

Therefore,
go
ch := make(chan type, value)

Any value > 0 will make the buffered channel.

Select

Go has a special statement which works like a switch but for channels where each case is a communication. A control structure unique to concurrency. It provides a way to handle multiple channels.

Evaluation is done for all the channels:
– Blocked until one channel fulfills the condition only after which the communication can proceed further.
– If multiple channels qualify to proceed, select statement chooses pseudo-randomly.
– If a default clause is present, it is executed immediately if no channel is ready.

select {
case check1 := <- c1:
  fmt.Println("Message 1 goes here", check1)
case check2 := <- c2:
  fmt.Println("Message 2 goes here", check2)
case <- time.After(time.Second):
  fmt.Println("This timed out.")
default:
  fmt.Println("No channel was ready. This is default.")
}

Use TIMEOUT to prevent the whole program from blocking if goroutine is blocked.

Range and Close

Range can be used to operate on buffer channels as in slice and map.

package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 1, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x + y
    }
    close(c)
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    // will not stop reading data from channel until the channel is closed.
    // use v, ok := <-ch to test if a channel is closed.
    for i := range c {
        fmt.Println(i)
    }
}

An example to concurrently fetch web resources

// This will fetch concurrently various web resources
// It will wait for all of them to be fetched
// It will process them all at once after it is fetched

package main

import (
    "fmt"
    "time"
    "net/http"
)

// Define a struct
type ResponseFromURL struct {
    url      string
    response *http.Response
    err      error
}

// Array of URLs that we are going to fetch
var urls = []string{
    "https://xebia.com/",
    "http://golang.org/",
    "https://www.facebook.com/XebiaIndia",
    "http://blog.xebia.in/2015/09/22/concurrency-in-go/",
}

// Function asynchronousHttpResponse takes an argument the array of URLs

func asynchronousHttpResponse(urls []string) []*ResponseFromURL {
    //
    ch := make(chan *ResponseFromURL)
    // an empty instance of a slice containing
    // pointers to HttpResponse objects.
    responses := []*ResponseFromURL{}
    // iterate through our urls
    for _, url := range urls {
        // define an anonymous function.
        // takes a string argument representing a url.
        go func(url string) {
            fmt.Printf("Wait. Currently fetching %s \n", url)
            // uses the net/http library to fetch the web resource.
            resp, err := http.Get(url)
            // returned data to create an instance of HttpResponse type
            // and send it to the channel.
            ch <- &ResponseFromURL{url, resp, err}
            }(url)
        }

    for {
        select {
        //  case statement checks if something is in the channel.
        //  allocate the data to the r variable
        //  print the resource’s url
        //  append the resource to the slice
        case r := <-ch:
            fmt.Printf("%s was fetched\n", r.url)
            responses = append(responses, r)
            // if all resources are fetched then return.
            if len(responses) == len(urls) {
                return responses
            }
        default:
            fmt.Printf(".")
            // print "." every 25ms.
            time.Sleep(25 * time.Millisecond)
        }
    }
    return responses
}

func main() {
    results := asynchronousHttpResponse(urls)
    for _, result := range results {
        fmt.Printf("\n%s \nstatus: %s\n", result.url, result.response.Status)
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *