Refactoring our simple Twitter API project to use concurrency and modules

In my last article, I showed you a simple Twitter script to connect to the API and clean some data. That script, was horrible, as all the code resided in a single .go file and we didn’t leverage concurrency.

I have now refactored the code & have now split it out as we show below; I’ve implemented concurrency using channels and go routines and I have added some functionality (like outputting my data to a dataframe).

My main.go file is now much cleaner than it was in the previous article. Here, I now simply define the list of Twitter handles I want to loop through and define a couple of channels. Remember, from this article that channels communicate between all the go routines and main routines to keep the main function informed about the progress of each routine.

Once we have defined the channels, we get on to looping through each handle. We then include the keyword go in front of our function calls to tell them to launch as go-routines to help us utilize concurrency.

Finally, we define 2 listeners that wait for 2 responses from each channel, before the application exits. Why 2 responses? Because they’re waiting for the length of the handles slice to be processed – we have 2 handles to process & we don’t want to stop the code until they have finished.

package main

import "fmt"

func main() {
	//query API for specific handle
	handles := []string{"BBCNews", "BBCSport"}

	//2 channels
	user_channel := make(chan string)
	post_channel := make(chan string) 

	for _, handle := range handles {
		go searchProfile(handle, user_channel)
		go getPosts(handle, post_channel)
	}

	//listeners for both channels
	for i :=0; i<len(handles); i++ {
		fmt.Println(<- user_channel)
	}

	for i :=0; i<len(handles); i++ {
		fmt.Println(<- post_channel)
	}
}

The first function we call is SearchProfile, which is included in search.go. So, let’s look at that.

Here, the first thing we do is to define a new struct called user_details. In here, we include all of the fields we’re interested in from the Twitter API call response.

Within the function itself, we connect to the API using the connect() function from api.go:

package main

import "github.com/amit-lulla/twitterapi" //https://godoc.org/github.com/amit-lulla/twitterapi

func connect()  *twitterapi.TwitterApi {
	twitterapi.SetConsumerKey("w")
    twitterapi.SetConsumerSecret("x")  
	api := twitterapi.NewTwitterApi("y", "x")
	return api
}

We then assign the results of the GetUsers API call to various variable names. We then pass the date into the CalcAge function, which returns the age of the account (today minus account opening date).

Finally, we assign those values to the user_details struct type that we defined above and we pass a message back to the user_channel to say that this profile has been processed.

package main

import "fmt"
import "database/sql"

type user_details struct {
	handle string
	url string
	timezone string
	StatusesCount int64
	location string
	CreatedAt string
	AccountAge float64
	FriendsCount int
	FollowersCount int
	FavouritesCount int
}
func searchProfile(profileName string, user_channel chan string) {
	api := connect()
	searchResult, _ := api.GetUsersShow(profileName, nil)
	FavouritesCount := searchResult.FavouritesCount
	FollowersCount := searchResult.FollowersCount
	FriendsCount := searchResult.FriendsCount
	CreatedAt := searchResult.CreatedAt
	Location := searchResult.Location
	StatusesCount := searchResult.StatusesCount
	TimeZone := searchResult.TimeZone
	URL := searchResult.URL
	handle := searchResult.Name
	
	acctAge := CalcAge(CreatedAt)

	output := user_details{
		handle : handle, 
		url : URL, 
		timezone: TimeZone,
		StatusesCount: StatusesCount, 
		location: Location, 
		CreatedAt: CreatedAt, 
		AccountAge: acctAge,
		FriendsCount: FriendsCount, 
		FollowersCount: FollowersCount, 
		FavouritesCount: FavouritesCount}

	fmt.Println(output)


	user_channel <- "Profile Complete"
}

Next, we have getposts.go. Inside here, we define our getPosts function and this is where things get a bit more confusing.

We do our API call and then we start defining a bunch of slices. We will populate these with the data we want to store each time we iterate over one of the posts in the response.

So, for each of the tweets in the response, we store the values to a particular variable name. For example value.IdStr is stored in a variable called id. During this process, we run some cleanup on our data to extract the hour from the datetime stamp; to clean the date format and so on.

Then we proceed to append the cleaned values to the slices we made earlier. By this point then, we have a bunch of populated slices – if we have 10 posts, we will have 10 items in each slice, if you stick all of those slices together you’d visually have something that looks a bit like a table.

Then, we calculate the totals for retweets and favorites using functions from aggregate.go. As you can see, these functions take the whole columns as slices and then iterate over those slices to calculate the totals.

//sum the retweets for the last 100 tweets
func sumRetweets(rt_list []int) int {
	c := 0
	for _, cnt := range rt_list {
        c = c + cnt
	}
	return c
}

//sum the favorites for the last 100 tweets
func sumFavorites(fav_list []int) int {
	c := 0
	for _, cnt := range fav_list {
        c = c + cnt
	}
	return c
}

In the same aggregate.go file, we also have functions to find the maximum number of retweets of favorites. Here, we iterate over the whole column again and look for the largest number.

//Find the max retweets for the last 100 tweets
func maxRetweets(rt_list []int) int {
	c := 0
	for _, cnt := range rt_list {
        if cnt > c {
			c = cnt
		}
	}
	return c
}

//Find the max favourites for the last 100 tweets
func maxFav(fav_list []int) int {
	c := 0
	for _, cnt := range fav_list {
        if cnt > c {
			c = cnt
		}
	}
	return c
}

We then loop through the search results again and we say ‘if the retweet count is = the max retweet count then that is the most retweeted tweet in the list.

Then, we create a new qframe which takes all of our appended slices in as an input and generates a nice output dataframe.

We then run a group by on that dataframe to calculate the sum of retweets, favorites and posts per day.

Finally, we tell the post_channel that we have finished processing this users posts.

package main

import "fmt"
import "strings"
import "net/url"
import "github.com/tobgu/qframe"
import "math"
import "github.com/tobgu/qframe/config/groupby"

func getPosts(profileName string, post_channel chan string) {
	api := connect()
	v, _ := url.ParseQuery("screen_name="+profileName+"&count=5&include_rts=False&tweet_mode=extended")
	searchResult, _ := api.GetUserTimeline(v)

	id_list := []string{}
	date_list := []string{}
	fav_list := []int{}
	rt_list := []int{}
	age_list := []float64{}
	hour_list := []string{}
	day_list := []string{}
	handle_list := []string{}
	interaction_list := []int{}
	is_max := []string{}
	is_max_fav := []string{}
	random_id := []int{}
	media := []int{}

	for _, value := range searchResult {
		id := value.IdStr
		CreatedAt := value.CreatedAt
		FavoriteCount := value.FavoriteCount
		RetweetCount := value.RetweetCount
		interactionCount := FavoriteCount+RetweetCount
	    Posted := CalcAge(CreatedAt)
		CreatedDate := CleanDate(CreatedAt)
		hour := extract_hour(CreatedAt)
        day := strings.Split(CreatedAt, " ")[0]
		rounded := math.Floor(Posted*100)/100 //rounds number
		id_list = append(id_list, id)
		date_list = append(date_list, CreatedDate)
		fav_list = append(fav_list, FavoriteCount)
		rt_list = append(rt_list, RetweetCount)
		age_list = append(age_list, rounded)
		hour_list = append(hour_list, hour)
		day_list = append(day_list, day)
		handle_list = append(handle_list, profileName)
		interaction_list = append(interaction_list, interactionCount)
		random_id = append(random_id, 1)
		media = append(media, len(value.ExtendedEntities.Media)) 

		}

		total_retweets := sumRetweets(rt_list)
		total_favorites := sumFavorites(fav_list)
		max_rt := maxRetweets(rt_list)
		max_fav := maxFav(fav_list)
		fmt.Println(total_retweets)
		fmt.Println(total_favorites)
		fmt.Println(max_rt)

	    //loop back through and assess whether that tweet has the highest retweet count
        for _, value := range searchResult {
			RetweetCount := value.RetweetCount
		    if RetweetCount == max_rt {
			    is_max = append(is_max, "YES")
		    } else {
			    is_max = append(is_max, "NO")
		    }
		}
	    //loop back through and assess whether that tweet has the highest favorite count
        for _, value := range searchResult {
			FavoriteCount := value.FavoriteCount
		    if FavoriteCount == max_fav {
			    is_max_fav = append(is_max_fav, "YES")
		    } else {
			    is_max_fav = append(is_max_fav, "NO")
		    }
		}		
		
		//Store output to a map
		f := qframe.New(map[string]interface{}{
			"media_included": media,
			"TweetID": id_list,
			"random_id": random_id,
			"Handle": handle_list,
			"CreatedAt": date_list,
			"Age": age_list,
			"FavoriteCount": fav_list,
			"FavMax": is_max_fav,
			"RetweetCount": rt_list,
			"RetweetMax": is_max,
			"interactionCount": interaction_list,
			"hour": hour_list,
			"day": day_list,
		})
		fmt.Println(f)

		datesum := func(xx []int) int {
			result := 0
			for _, x := range xx {
				result += x
			}
			return result
		}

		//aggregate for each date, sum favorites, retweets and total interactions
		g := f.GroupBy(groupby.Columns("CreatedAt")).Aggregate(qframe.Aggregation{Fn: datesum, Column: "interactionCount"}, 
				 qframe.Aggregation{Fn: datesum, Column: "RetweetCount"}, 
				 qframe.Aggregation{Fn: datesum, Column: "FavoriteCount"},
				 qframe.Aggregation{Fn: datesum, Column: "random_id"})
		fmt.Println(g)


	post_channel <- "Detailed Profile Complete"
}

Below I have included the cleanup function so you can see the actions I was performing.

package main

import "strings"
import "time"
import "log"

func CleanDate(CreatedAt string) string {
	month := strings.Split(CreatedAt, " ")[1]
	day := strings.Split(CreatedAt, " ")[2]
	year := strings.Split(CreatedAt, " ")[5]

    if month == "Jan" {
		month = "01"
	} else if month == "Feb" {
		month = "02"
	} else if month == "Mar" {
		month = "03"
	} else if month == "Apr" {
		month = "04"
	} else if month == "May" {
		month = "05"
	} else if month == "Jun" {
		month = "06"
	} else if month == "Jul" {
		month = "07"
	} else if month == "Aug" {
		month = "08"
	} else if month == "Sep" {
		month = "09"
	} else if month == "Oct" {
		month = "10"
	} else if month == "Nov" {
		month = "11"
	} else if month == "Dec" {
		month = "12"
	} 

	full_date := year + "-" + month + "-" + day

	return full_date
}

func CalcAge(CreatedAt string) float64 {
	month := strings.Split(CreatedAt, " ")[1]
	day := strings.Split(CreatedAt, " ")[2]
	year := strings.Split(CreatedAt, " ")[5]

    if month == "Jan" {
		month = "01"
	} else if month == "Feb" {
		month = "02"
	} else if month == "Mar" {
		month = "03"
	} else if month == "Apr" {
		month = "04"
	} else if month == "May" {
		month = "05"
	} else if month == "Jun" {
		month = "06"
	} else if month == "Jul" {
		month = "07"
	} else if month == "Aug" {
		month = "08"
	} else if month == "Sep" {
		month = "09"
	} else if month == "Oct" {
		month = "10"
	} else if month == "Nov" {
		month = "11"
	} else if month == "Dec" {
		month = "12"
	} 

	full_date := year + "-" + month + "-" + day
	
	//define parsedDate as a time data type
	var parsedDate time.Time
	
	//convert our string datatype to be of type time
    parsedDate, err := time.Parse("2006-01-02", full_date)
    if err != nil {
        log.Fatalln(err)
    }

    today := time.Now()
	
	//subtract created date from today to get age 
    age := today.Sub(parsedDate).Hours() / 24
	return age
}

func extract_hour(CreatedAt string) string {
	time := strings.Split(CreatedAt, " ")[3]
	hour := strings.Split(time, ":")[0]
	return hour

}