introduce rwmutex locks and semaphore locks to cater mass-file/tcp opening

This commit is contained in:
Wouter Groeneveld 2021-04-16 16:17:09 +02:00
parent 62c55cdbb1
commit c246081729
11 changed files with 186 additions and 31 deletions

View File

@ -44,7 +44,6 @@ func ipFrom(r *http.Request) string {
func Start() { func Start() {
r := mux.NewRouter() r := mux.NewRouter()
config := common.Configure() config := common.Configure()
config.SetupDataDirs()
helmet := helmet.Default() helmet := helmet.Default()
server := &server{router: r, conf: config} server := &server{router: r, conf: config}

View File

@ -19,6 +19,8 @@ var (
func HandleGet(conf *common.Config) http.HandlerFunc { func HandleGet(conf *common.Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
domain := mux.Vars(r)["domain"] domain := mux.Vars(r)["domain"]
conf.Lock(domain)
defer conf.Unlock(domain)
result := load.FromDisk(domain, conf.DataPath) result := load.FromDisk(domain, conf.DataPath)
rest.Json(w, result) rest.Json(w, result)

View File

@ -9,13 +9,16 @@ import (
// FromDisk assumes that params have already been validated. // FromDisk assumes that params have already been validated.
func FromDisk(domain string, dataPath string) mf.IndiewebDataResult { func FromDisk(domain string, dataPath string) mf.IndiewebDataResult {
loadPath := path.Join(dataPath, domain) loadPath := path.Join(dataPath, domain)
info, _ := ioutil.ReadDir(loadPath) info, _ := ioutil.ReadDir(loadPath)
amountOfFiles := len(info) amountOfFiles := len(info)
sema := make(chan struct{}, 20)
results := make(chan *mf.IndiewebData, amountOfFiles) results := make(chan *mf.IndiewebData, amountOfFiles)
for _, file := range info { for _, file := range info {
go func(fileName string) { go func(fileName string) {
sema <- struct{}{}
defer func() { <-sema }()
results <- mf.RequireFromFile(path.Join(loadPath, fileName)) results <- mf.RequireFromFile(path.Join(loadPath, fileName))
}(file.Name()) }(file.Name())
} }

View File

@ -13,7 +13,8 @@ import (
// stress tests to see what concurrent disk access is like. Runs fine, even with 5000 runs and 100 files. // stress tests to see what concurrent disk access is like. Runs fine, even with 5000 runs and 100 files.
// this means worker pools do not have to be implemented in FromDisk(). // this means worker pools do not have to be implemented in FromDisk().
// However, if runs := 10000, some results are empty. At other times, even ioutil.ReadDir() panics... // However, if runs := 10000, some results are empty. At other times, even ioutil.ReadDir() panics...
// The rate limiter should catch this. // The rate limiter should catch this, combined with a domain read lock in the caller.
// Furthermore, a run of 1 and files of 50k breaks the OS without using a semaphore to limit the nr. of open files!
func TestFromDiskStressTest(t *testing.T) { func TestFromDiskStressTest(t *testing.T) {
runs := 100 runs := 100
files := 100 files := 100

View File

@ -63,6 +63,9 @@ func (recv *Receiver) convertBodyToIndiewebData(body string, wm mf.Mention, hEnt
} }
func (recv *Receiver) saveWebmentionToDisk(wm mf.Mention, indieweb *mf.IndiewebData) error { func (recv *Receiver) saveWebmentionToDisk(wm mf.Mention, indieweb *mf.IndiewebData) error {
domain, _ := recv.Conf.FetchDomain(wm.Target)
recv.Conf.Lock(domain)
defer recv.Conf.Unlock(domain)
jsonData, jsonErr := json.Marshal(indieweb) jsonData, jsonErr := json.Marshal(indieweb)
if jsonErr != nil { if jsonErr != nil {
return jsonErr return jsonErr

View File

@ -17,6 +17,7 @@ import (
var conf = &common.Config{ var conf = &common.Config{
AllowedWebmentionSources: []string{ AllowedWebmentionSources: []string{
"jefklakscodex.com", "jefklakscodex.com",
"brainbaking.com",
}, },
DataPath: "testdata", DataPath: "testdata",
} }
@ -107,7 +108,7 @@ func TestReceive(t *testing.T) {
} }
receiver := &Receiver{ receiver := &Receiver{
Conf: conf, Conf: common.NewConfig(conf),
RestClient: &mocks.RestClientMock{ RestClient: &mocks.RestClientMock{
GetBodyFunc: mocks.RelPathGetBodyFunc(t, "../../../mocks/"), GetBodyFunc: mocks.RelPathGetBodyFunc(t, "../../../mocks/"),
}, },
@ -138,7 +139,7 @@ func TestReceiveTargetDoesNotExistAnymoreDeletesPossiblyOlderWebmention(t *testi
}, },
} }
receiver := &Receiver{ receiver := &Receiver{
Conf: conf, Conf: common.NewConfig(conf),
RestClient: client, RestClient: client,
} }
@ -155,7 +156,7 @@ func TestReceiveTargetThatDoesNotPointToTheSourceDoesNothing(t *testing.T) {
writeSomethingTo(filename) writeSomethingTo(filename)
receiver := &Receiver{ receiver := &Receiver{
Conf: conf, Conf: common.NewConfig(conf),
RestClient: &mocks.RestClientMock{ RestClient: &mocks.RestClientMock{
GetBodyFunc: mocks.RelPathGetBodyFunc(t, "../../../mocks/"), GetBodyFunc: mocks.RelPathGetBodyFunc(t, "../../../mocks/"),
}, },
@ -174,7 +175,7 @@ func TestProcessSourceBodyAbortsIfNoMentionOfTargetFoundInSourceHtml(t *testing.
Target: "https://jefklakscodex.com/articles", Target: "https://jefklakscodex.com/articles",
} }
receiver := &Receiver{ receiver := &Receiver{
Conf: conf, Conf: common.NewConfig(conf),
} }
receiver.processSourceBody("<html>my nice body</html>", wm) receiver.processSourceBody("<html>my nice body</html>", wm)

View File

@ -5,7 +5,11 @@ import (
"brainbaking.com/go-jamming/app/pingback/send" "brainbaking.com/go-jamming/app/pingback/send"
"brainbaking.com/go-jamming/common" "brainbaking.com/go-jamming/common"
"brainbaking.com/go-jamming/rest" "brainbaking.com/go-jamming/rest"
"fmt"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"io/fs"
"io/ioutil"
"strings"
"sync" "sync"
"time" "time"
) )
@ -15,8 +19,27 @@ type Sender struct {
Conf *common.Config Conf *common.Config
} }
func (snder *Sender) sinceForDomain(domain string, since string) time.Time {
if since != "" {
return common.IsoToTime(since)
}
sinceConf, err := ioutil.ReadFile(fmt.Sprintf("%s/%s-since.txt", snder.Conf.DataPath, domain))
if err != nil {
log.Warn().Str("since", since).Msg("No query param, and no config found. Reverting to beginning of time...")
return time.Time{}
}
return common.IsoToTime(string(sinceConf))
}
func (snder *Sender) saveSinceForDomain(domain string, since time.Time) {
ioutil.WriteFile(fmt.Sprintf("%s/%s-since.txt", snder.Conf.DataPath, domain), []byte(common.TimeToIso(since)), fs.ModePerm)
}
func (snder *Sender) Send(domain string, since string) { func (snder *Sender) Send(domain string, since string) {
log.Info().Str("domain", domain).Str("since", since).Msg(` OK: someone wants to send mentions`) snder.Conf.Lock(domain)
defer snder.Conf.Unlock(domain)
timeSince := snder.sinceForDomain(domain, since)
log.Info().Str("domain", domain).Time("since", timeSince).Msg(` OK: someone wants to send mentions`)
feedUrl := "https://" + domain + "/index.xml" feedUrl := "https://" + domain + "/index.xml"
_, feed, err := snder.RestClient.GetBody(feedUrl) _, feed, err := snder.RestClient.GetBody(feedUrl)
if err != nil { if err != nil {
@ -24,9 +47,12 @@ func (snder *Sender) Send(domain string, since string) {
return return
} }
if err = snder.parseRssFeed(feed, common.IsoToTime(since)); err != nil { if err = snder.parseRssFeed(feed, timeSince); err != nil {
log.Err(err).Str("url", feedUrl).Msg("Unable to parse RSS feed, send aborted") log.Err(err).Str("url", feedUrl).Msg("Unable to parse RSS feed, send aborted")
return
} }
snder.saveSinceForDomain(domain, timeSince)
} }
func (snder *Sender) parseRssFeed(feed string, since time.Time) error { func (snder *Sender) parseRssFeed(feed string, since time.Time) error {
@ -36,19 +62,25 @@ func (snder *Sender) parseRssFeed(feed string, since time.Time) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
sema := make(chan struct{}, 20)
for _, item := range items { for _, item := range items {
for _, href := range item.hrefs { for _, href := range item.hrefs {
mention := mf.Mention{ if strings.HasPrefix(href, "http") {
// SOURCE is own domain this time, TARGET = outbound mention := mf.Mention{
Source: item.link, // SOURCE is own domain this time, TARGET = outbound
Target: href, Source: item.link,
} Target: href,
}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() sema <- struct{}{}
snder.sendMention(mention) defer func() { <-sema }()
}() defer wg.Done()
snder.sendMention(mention)
}()
}
} }
} }
wg.Wait() wg.Wait()

View File

@ -7,12 +7,68 @@ import (
"brainbaking.com/go-jamming/rest" "brainbaking.com/go-jamming/rest"
"fmt" "fmt"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"os"
"sync" "sync"
"testing" "testing"
"time"
) )
func TestSinceForDomain(t *testing.T) {
cases := []struct {
label string
sinceInParam string
sinceInFile string
expected time.Time
}{
{
"Is since parameter if provided",
"2021-03-09T15:51:43.732Z",
"",
time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC),
},
{
"Is file contents if since parameter is empty and file is not",
"",
"2021-03-09T15:51:43.732Z",
time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC),
},
{
"Is empty time if both parameter and file are not present",
"",
"",
time.Time{},
},
}
snder := Sender{
Conf: &common.Config{
DataPath: "testdata",
},
}
for _, tc := range cases {
t.Run(tc.label, func(t *testing.T) {
os.MkdirAll("testdata", os.ModePerm)
defer os.RemoveAll("testdata")
if tc.sinceInFile != "" {
ioutil.WriteFile("testdata/domain-since.txt", []byte(tc.sinceInFile), os.ModePerm)
}
actual := snder.sinceForDomain("domain", tc.sinceInParam)
assert.Equal(t, tc.expected.Year(), actual.Year())
assert.Equal(t, tc.expected.Month(), actual.Month())
assert.Equal(t, tc.expected.Day(), actual.Day())
assert.Equal(t, tc.expected.Hour(), actual.Hour())
assert.Equal(t, tc.expected.Minute(), actual.Minute())
assert.Equal(t, tc.expected.Second(), actual.Second())
})
}
}
func TestSendMentionAsWebmention(t *testing.T) { func TestSendMentionAsWebmention(t *testing.T) {
passedFormValues := url.Values{} passedFormValues := url.Values{}
snder := Sender{ snder := Sender{
@ -42,6 +98,7 @@ func TestSendMentionIntegrationStressTest(t *testing.T) {
Conf: common.Configure(), Conf: common.Configure(),
RestClient: &rest.HttpClient{}, RestClient: &rest.HttpClient{},
} }
defer os.RemoveAll("data")
runs := 100 runs := 100
responses := make(chan bool, runs) responses := make(chan bool, runs)
@ -85,21 +142,22 @@ func TestSendMentionIntegrationStressTest(t *testing.T) {
func TestSendIntegrationTestCanSendBothWebmentionsAndPingbacks(t *testing.T) { func TestSendIntegrationTestCanSendBothWebmentionsAndPingbacks(t *testing.T) {
posted := map[string]interface{}{} posted := map[string]interface{}{}
var lock = sync.RWMutex{} var lock = sync.Mutex{}
defer os.RemoveAll("data")
snder := Sender{ snder := Sender{
Conf: common.Configure(), Conf: common.Configure(),
RestClient: &mocks.RestClientMock{ RestClient: &mocks.RestClientMock{
GetBodyFunc: mocks.RelPathGetBodyFunc(t, "./../../../mocks/"), GetBodyFunc: mocks.RelPathGetBodyFunc(t, "./../../../mocks/"),
PostFunc: func(url string, contentType string, body string) error { PostFunc: func(url string, contentType string, body string) error {
lock.RLock() lock.Lock()
defer lock.RUnlock() defer lock.Unlock()
posted[url] = body posted[url] = body
return nil return nil
}, },
PostFormFunc: func(endpoint string, formValues url.Values) error { PostFormFunc: func(endpoint string, formValues url.Values) error {
lock.RLock() lock.Lock()
defer lock.RUnlock() defer lock.Unlock()
posted[endpoint] = formValues posted[endpoint] = formValues
return nil return nil
}, },

View File

@ -6,6 +6,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"strings" "strings"
"sync"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -17,6 +18,21 @@ type Config struct {
DataPath string `json:"dataPath"` DataPath string `json:"dataPath"`
AllowedWebmentionSources []string `json:"allowedWebmentionSources"` AllowedWebmentionSources []string `json:"allowedWebmentionSources"`
DisallowedWebmentionDomains []string `json:"disallowedWebmentionDomains"` DisallowedWebmentionDomains []string `json:"disallowedWebmentionDomains"`
domainLocks map[string]*sync.RWMutex
}
func (c *Config) Lock(domain string) {
c.domainLocks[domain].Lock()
}
func (c *Config) RLock(domain string) {
c.domainLocks[domain].RLock()
}
func (c *Config) RUnLock(domain string) {
c.domainLocks[domain].RUnlock()
}
func (c *Config) Unlock(domain string) {
c.domainLocks[domain].Unlock()
} }
func (c *Config) missingKeys() []string { func (c *Config) missingKeys() []string {
@ -63,14 +79,37 @@ func (c *Config) FetchDomain(url string) (string, error) {
return "", errors.New("no allowed domain found for url " + url) return "", errors.New("no allowed domain found for url " + url)
} }
func (c *Config) SetupDataDirs() { func NewConfig(c *Config) *Config {
for _, domain := range c.AllowedWebmentionSources { conf := &Config{
os.MkdirAll(c.DataPath+"/"+domain, os.ModePerm) Port: c.Port,
Token: c.Token,
UtcOffset: c.UtcOffset,
DataPath: c.DataPath,
AllowedWebmentionSources: c.AllowedWebmentionSources,
DisallowedWebmentionDomains: c.DisallowedWebmentionDomains,
domainLocks: map[string]*sync.RWMutex{},
}
initConfig(conf)
return conf
}
func Configure() *Config {
conf := config()
initConfig(conf)
return conf
}
func initConfig(conf *Config) {
conf.domainLocks = map[string]*sync.RWMutex{}
for _, domain := range conf.AllowedWebmentionSources {
conf.domainLocks[domain] = &sync.RWMutex{}
os.MkdirAll(conf.DataPath+"/"+domain, os.ModePerm)
log.Info().Str("allowedDomain", domain).Msg("Configured") log.Info().Str("allowedDomain", domain).Msg("Configured")
} }
} }
func Configure() (c *Config) { func config() *Config {
confData, err := ioutil.ReadFile("config.json") confData, err := ioutil.ReadFile("config.json")
if err != nil { if err != nil {
log.Warn().Msg("No config.json file found, reverting to defaults...") log.Warn().Msg("No config.json file found, reverting to defaults...")

View File

@ -9,13 +9,22 @@ import (
// None of the above are very appealing. For now, just use the lazy way. // None of the above are very appealing. For now, just use the lazy way.
var Now = time.Now var Now = time.Now
// since should be in ISO String format, as produced by clients using day.js - e.g. 2021-04-09T15:51:43.732Z const (
IsoFormat = "2006-01-02T15:04:05.000Z"
)
// TimeToIso converts time to ISO string format, up to seconds.
func TimeToIso(theTime time.Time) string {
return theTime.Format(IsoFormat)
}
// IsoToTime converts an ISO time string into a time.Time object
// As produced by clients using day.js - e.g. 2021-04-09T15:51:43.732Z
func IsoToTime(since string) time.Time { func IsoToTime(since string) time.Time {
if since == "" { if since == "" {
return time.Time{} return time.Time{}
} }
layout := "2006-01-02T15:04:05.000Z" t, err := time.Parse(IsoFormat, since)
t, err := time.Parse(layout, since)
if err != nil { if err != nil {
log.Warn().Str("time", since).Msg("Invalid ISO date, reverting to now()") log.Warn().Str("time", since).Msg("Invalid ISO date, reverting to now()")
return Now() return Now()

View File

@ -23,6 +23,14 @@ func TestSendSuite(t *testing.T) {
suite.Run(t, new(TimeSuite)) suite.Run(t, new(TimeSuite))
} }
func (s *TimeSuite) TestTimeToIso() {
theTime := time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC)
expected := "2021-03-09T15:51:43.000Z"
actual := TimeToIso(theTime)
assert.Equal(s.T(), expected, actual)
}
func (s *TimeSuite) TestIsoToTimeInISOString() { func (s *TimeSuite) TestIsoToTimeInISOString() {
expectedtime := time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC) expectedtime := time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC)
since := IsoToTime("2021-03-09T15:51:43.732Z") since := IsoToTime("2021-03-09T15:51:43.732Z")