The task at hand was to create a service that aggregates news from different sources. Rather than making individual HTTP requests for each news provider, we opted for a more scalable and configurable approach. The solution involves using a YAML configuration file to specify different news sources, their endpoints, parameters, and the transformation rules for unifying the data.
The YAML configuration file outlines the news sources, their URLs, HTTP methods, parameters, and the rules for extracting relevant information. Let’s break down the key components:
news:
sources:
- url: "https://api.marketaux.com/v1/news/all"
method: get
limit: 10
params:
api_token:
countries: in
filter_entities: true
limit: 10
language: en
iterator: data
fields:
- key: author
value: 'MarketAux'
static: true
- key: provider
value: data.#.source
iter: true
modifier: "split(.) |> first |> lower |> title"
datatype: string
- key: title
value: data.#.title
iter: true
- key: description
value: data.#.description
iter: true
- key: url
value: data.#.url
iter: true
- key: image_url
value: data.#.image_url
iter: true
- key: content
value: data.#.description
iter: true
- key: published_at
value: data.#.published_at
iter: true
- url: "https://newsapi.org/v2/everything"
limit: 20
params:
apiKey:
sources: "the-hindu,the-times-of-india"
q: bank
iterator: articles
fields:
# ... (fields configuration)
The configuration allows for multiple news sources, each with its own set of parameters and transformation rules.
The above YAML is used as a template, so that the API Tokens
can be replaced during runtime, from config.
The Go code for this News Aggregator is designed to be modular and extensible. Let’s look at some key components:
type NewsResponse struct {
Author string `json:"author"`
Provider string `json:"provider"`
Title string `json:"title"`
Description string `json:"description"`
URL string `json:"url"`
ImageURL string `json:"image_url"`
Content string `json:"content"`
PublishedAt string `json:"published_at"`
}
This struct represents the unified format for news responses.
type NewsAggregator struct {
News RootObj `yaml:"news"`
}
type RootObj struct {
Sources []Source `yaml:"sources"`
}
type Source struct {
URL string `yaml:"url"`
Method string `yaml:"method"`
Limit int `yaml:"limit"`
Fields []Fields `yaml:"fields"`
Params map[string]interface{} `yaml:"params,omitempty"`
Iterator *string `yaml:"iterator"`
}
The NewsAggregator
struct mirrors the structure of the YAML configuration.
The NewsFetcher
and NewsParser
structs handle fetching and parsing news from different sources. The FetchNewsUsingConfig
function orchestrates the parallel fetching of news from multiple sources.
type NewsFetcher struct {
Client *http.Client
}
func (nf *NewsFetcher) Fetch(source Source) (response string, err error) {
method := source.Method
if len(method) == 0 {
method = "get"
}
method = strings.ToUpper(method)
log.Println(method, source.URL)
req, err := http.NewRequest(method, source.URL, nil)
if err != nil {
log.Printf("failed to build request to newsapi. Reason: %v \n", err)
return response, err
}
q := req.URL.Query()
for key, value := range source.Params {
q.Add(key, fmt.Sprintf("%v", value))
}
req.URL.RawQuery = q.Encode()
b, err := httputil.DumpRequest(req, true)
if err != nil {
panic(err)
}
res, err := nf.Client.Do(req)
if err != nil {
log.Printf("failed to make request to newsapi. Reason: %v \n", err)
return response, err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
log.Printf("failed to make request to newsapi. Reason: %v \n", err)
return response, err
}
response = string(body)
// log.Println("body ", bodyStr)
if res.StatusCode >= 400 {
return response, errors.New("http_request_failed")
}
// log.Printf("response received %v\n", response)
log.Println("response length ", len(response))
return response, nil
}
ParseModifier
FunctionThe ParseModifier
function in the NewsParser
struct is responsible for parsing the modifier provided in the YAML configuration. This function splits the modifier into different parts and identifies the functions that need to be applied during the data transformation.
type NewsParser struct {
fetcher Fetcher
}
func NewNewsParser(fetcher Fetcher) *NewsParser {
return &NewsParser{
fetcher: fetcher,
}
}
func (np *NewsParser) ParseModifier(str string) ([]interface{}, error) {
parts := strings.Split(str, "|>")
functions := make([]interface{}, 0, len(parts))
tFuncs := NewTemplateFuncs().Funcs()
log.Println(tFuncs)
for _, part := range parts {
part = strings.TrimSpace(part)
match := regexp.MustCompile(`^(\w+)\((.*?)\)`).FindStringSubmatch(part)
if match == nil {
log.Println("1 airty function")
f, ok := tFuncs[part]
if !ok {
return nil, fmt.Errorf("unknown function: %s", part)
}
functions = append(functions, f)
continue
}
log.Println("2 airty function ", match[1], match[2])
f := tFuncs[match[1]]
if curryable, ok := f.(func(string) StrSliceable); ok {
functions = append(functions, curryable(match[2]))
} else {
return nil, fmt.Errorf("function %s is not curryable", match[1])
}
}
return functions, nil
}
gjson
The gjson
library is employed to parse the JSON response obtained from the news sources. The ParseSource
function in the NewsParser
struct utilizes gjson
to retrieve the number of records based on the iterator and iterates over the fields, replacing #
with the index to fetch the proper values.
func (np *NewsParser) ParseSource(source Source) (ns NewsResponseData, err error) {
// Iterate over length
// For each field paramtere, if iter: true, then replace # with index
// Call gjson.Get on the item
// Build the struct and add to list of response
count := gjson.Get(bodyStr, fmt.Sprintf("%s.#", iter)).Int()
log.Println("count of data", count)
responses := []map[string]interface{}{}
for i := int64(0); i < count; i += 1 {
nresp := map[string]interface{}{}
for _, field := range source.Fields {
if field.Static {
nresp[field.Key] = field.Value
continue
}
valueTpl := field.Value
if field.ShouldIter && strings.Contains(valueTpl, "#") {
gjsonKey := strings.Replace(valueTpl, "#", fmt.Sprintf("%d", i), 1)
nresp[field.Key] = gjson.Get(bodyStr, gjsonKey).Value()
} else {
nresp[field.Key] = gjson.Get(bodyStr, valueTpl).Value()
}
if field.Modifier == nil {
continue
}
value := nresp[field.Key]
dataType := field.DataType
if dataType == nil {
strType := DataTypeString
dataType = &strType
}
log.Printf("response value %+v\n", value)
// modify the initial value with the modifier
result := value
switch *dataType {
case DataTypeString:
funcs, err := np.ParseModifier(*field.Modifier)
if err != nil {
log.Printf("invalid data type. received %+v\n", err)
return ns, err
}
for i := 0; i < len(funcs); i++ {
f := funcs[i]
switch f := f.(type) {
case StrSliceable:
// log.Printf("str sliceable %+v\n", result)
result = f(result.(string))
case func([]string) string:
// log.Printf("str slice %+v\n", result)
result = f(result.([]string))
case func(string) string:
// log.Printf("point string %+v\n", result)
result = f(result.(string))
default:
log.Printf("failed to match modifier func type %#V\n", f)
return ns, ErrUndefinedModifier
}
}
default:
return ns, ErrUnsupportedDataType
}
nresp[field.Key] = result
}
responses = append(responses, nresp)
}
ns.Source = source
ns.Responses = responses
return ns, nil
}
FetchNewsFromConfig
function to get newsThis fetches the news from multiple sources in parallel. Although we could have had a list of workers
func (np *NewsParser) FetchNewsUsingConfig(
ctx context.Context,
fileName string,
data map[string]interface{},
) ([]map[string]interface{}, error) {
tmpl, err := template.New("newsources.yaml").ParseFiles(fileName)
if err != nil {
panic(err)
}
var buffer bytes.Buffer
err = tmpl.Execute(&buffer, data)
if err != nil {
log.Println("failed to execute template", err)
panic(err)
}
ns := &NewsAggregator{}
err = yaml.Unmarshal(buffer.Bytes(), &ns)
if err != nil {
panic(err)
}
// Create n waitgroups where n = number of news sources
var wg sync.WaitGroup
wg.Add(len(ns.News.Sources))
var responsesChan = make(chan NewsResponseData, len(ns.News.Sources))
for _, source := range ns.News.Sources {
go func(w *sync.WaitGroup, src Source) {
defer w.Done()
resp, err := np.ParseSource(src)
if err != nil {
log.Println("failed to fetch news from source ", src.URL)
return
}
responsesChan <- resp
}(&wg, source)
}
// Wait for all the responses to complete
wg.Wait()
close(responsesChan)
responses := []map[string]interface{}{}
// Aggregate all the responses and put it inside responses
for resps := range responsesChan {
log.Println(len(resps.Responses), " news items received from", resps.Source.URL)
responses = append(responses, resps.Responses...)
}
log.Println(len(responses), " total news received")
return responses, nil
}
Now you can use a cache to store the response data. You can set the cache expiry based on the limit
in the config yaml file.
This News Aggregator provides a flexible and scalable solution for fetching and unifying news from diverse sources. The YAML configuration allows for easy customization, making it adaptable to various use cases. The Go code’s modular design ensures maintainability and extensibility, enabling future enhancements and feature additions.
By centralizing news from different sources into a unified format, users can effortlessly access a curated stream of information. This News Aggregator serves as a foundation for creating more sophisticated news aggregation services tailored to specific needs and preferences.
Thank you