diff --git a/app/server.go b/app/server.go index e120e74..537acef 100644 --- a/app/server.go +++ b/app/server.go @@ -44,7 +44,6 @@ func ipFrom(r *http.Request) string { func Start() { r := mux.NewRouter() config := common.Configure() - config.SetupDataDirs() helmet := helmet.Default() server := &server{router: r, conf: config} diff --git a/app/webmention/handler.go b/app/webmention/handler.go index 0f822ec..70b089f 100644 --- a/app/webmention/handler.go +++ b/app/webmention/handler.go @@ -19,6 +19,8 @@ var ( func HandleGet(conf *common.Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { domain := mux.Vars(r)["domain"] + conf.Lock(domain) + defer conf.Unlock(domain) result := load.FromDisk(domain, conf.DataPath) rest.Json(w, result) diff --git a/app/webmention/load/loader.go b/app/webmention/load/loader.go index 58f5e20..b1549f0 100644 --- a/app/webmention/load/loader.go +++ b/app/webmention/load/loader.go @@ -9,13 +9,16 @@ import ( // FromDisk assumes that params have already been validated. func FromDisk(domain string, dataPath string) mf.IndiewebDataResult { loadPath := path.Join(dataPath, domain) - info, _ := ioutil.ReadDir(loadPath) amountOfFiles := len(info) + + sema := make(chan struct{}, 20) results := make(chan *mf.IndiewebData, amountOfFiles) for _, file := range info { go func(fileName string) { + sema <- struct{}{} + defer func() { <-sema }() results <- mf.RequireFromFile(path.Join(loadPath, fileName)) }(file.Name()) } diff --git a/app/webmention/load/loader_test.go b/app/webmention/load/loader_test.go index 69e3ccc..2ffe846 100644 --- a/app/webmention/load/loader_test.go +++ b/app/webmention/load/loader_test.go @@ -13,7 +13,8 @@ import ( // stress tests to see what concurrent disk access is like. Runs fine, even with 5000 runs and 100 files. // this means worker pools do not have to be implemented in FromDisk(). // However, if runs := 10000, some results are empty. At other times, even ioutil.ReadDir() panics... -// The rate limiter should catch this. +// The rate limiter should catch this, combined with a domain read lock in the caller. +// Furthermore, a run of 1 and files of 50k breaks the OS without using a semaphore to limit the nr. of open files! func TestFromDiskStressTest(t *testing.T) { runs := 100 files := 100 diff --git a/app/webmention/recv/receive.go b/app/webmention/recv/receive.go index b13cfa7..c8f4a50 100644 --- a/app/webmention/recv/receive.go +++ b/app/webmention/recv/receive.go @@ -63,6 +63,9 @@ func (recv *Receiver) convertBodyToIndiewebData(body string, wm mf.Mention, hEnt } func (recv *Receiver) saveWebmentionToDisk(wm mf.Mention, indieweb *mf.IndiewebData) error { + domain, _ := recv.Conf.FetchDomain(wm.Target) + recv.Conf.Lock(domain) + defer recv.Conf.Unlock(domain) jsonData, jsonErr := json.Marshal(indieweb) if jsonErr != nil { return jsonErr diff --git a/app/webmention/recv/receive_test.go b/app/webmention/recv/receive_test.go index c1c0053..1b2f9a4 100644 --- a/app/webmention/recv/receive_test.go +++ b/app/webmention/recv/receive_test.go @@ -17,6 +17,7 @@ import ( var conf = &common.Config{ AllowedWebmentionSources: []string{ "jefklakscodex.com", + "brainbaking.com", }, DataPath: "testdata", } @@ -107,7 +108,7 @@ func TestReceive(t *testing.T) { } receiver := &Receiver{ - Conf: conf, + Conf: common.NewConfig(conf), RestClient: &mocks.RestClientMock{ GetBodyFunc: mocks.RelPathGetBodyFunc(t, "../../../mocks/"), }, @@ -138,7 +139,7 @@ func TestReceiveTargetDoesNotExistAnymoreDeletesPossiblyOlderWebmention(t *testi }, } receiver := &Receiver{ - Conf: conf, + Conf: common.NewConfig(conf), RestClient: client, } @@ -155,7 +156,7 @@ func TestReceiveTargetThatDoesNotPointToTheSourceDoesNothing(t *testing.T) { writeSomethingTo(filename) receiver := &Receiver{ - Conf: conf, + Conf: common.NewConfig(conf), RestClient: &mocks.RestClientMock{ GetBodyFunc: mocks.RelPathGetBodyFunc(t, "../../../mocks/"), }, @@ -174,7 +175,7 @@ func TestProcessSourceBodyAbortsIfNoMentionOfTargetFoundInSourceHtml(t *testing. Target: "https://jefklakscodex.com/articles", } receiver := &Receiver{ - Conf: conf, + Conf: common.NewConfig(conf), } receiver.processSourceBody("my nice body", wm) diff --git a/app/webmention/send/send.go b/app/webmention/send/send.go index 6d62bb2..e156379 100644 --- a/app/webmention/send/send.go +++ b/app/webmention/send/send.go @@ -5,7 +5,11 @@ import ( "brainbaking.com/go-jamming/app/pingback/send" "brainbaking.com/go-jamming/common" "brainbaking.com/go-jamming/rest" + "fmt" "github.com/rs/zerolog/log" + "io/fs" + "io/ioutil" + "strings" "sync" "time" ) @@ -15,8 +19,27 @@ type Sender struct { Conf *common.Config } +func (snder *Sender) sinceForDomain(domain string, since string) time.Time { + if since != "" { + return common.IsoToTime(since) + } + sinceConf, err := ioutil.ReadFile(fmt.Sprintf("%s/%s-since.txt", snder.Conf.DataPath, domain)) + if err != nil { + log.Warn().Str("since", since).Msg("No query param, and no config found. Reverting to beginning of time...") + return time.Time{} + } + return common.IsoToTime(string(sinceConf)) +} + +func (snder *Sender) saveSinceForDomain(domain string, since time.Time) { + ioutil.WriteFile(fmt.Sprintf("%s/%s-since.txt", snder.Conf.DataPath, domain), []byte(common.TimeToIso(since)), fs.ModePerm) +} + func (snder *Sender) Send(domain string, since string) { - log.Info().Str("domain", domain).Str("since", since).Msg(` OK: someone wants to send mentions`) + snder.Conf.Lock(domain) + defer snder.Conf.Unlock(domain) + timeSince := snder.sinceForDomain(domain, since) + log.Info().Str("domain", domain).Time("since", timeSince).Msg(` OK: someone wants to send mentions`) feedUrl := "https://" + domain + "/index.xml" _, feed, err := snder.RestClient.GetBody(feedUrl) if err != nil { @@ -24,9 +47,12 @@ func (snder *Sender) Send(domain string, since string) { return } - if err = snder.parseRssFeed(feed, common.IsoToTime(since)); err != nil { + if err = snder.parseRssFeed(feed, timeSince); err != nil { log.Err(err).Str("url", feedUrl).Msg("Unable to parse RSS feed, send aborted") + return } + + snder.saveSinceForDomain(domain, timeSince) } func (snder *Sender) parseRssFeed(feed string, since time.Time) error { @@ -36,19 +62,25 @@ func (snder *Sender) parseRssFeed(feed string, since time.Time) error { } var wg sync.WaitGroup + sema := make(chan struct{}, 20) + for _, item := range items { for _, href := range item.hrefs { - mention := mf.Mention{ - // SOURCE is own domain this time, TARGET = outbound - Source: item.link, - Target: href, - } + if strings.HasPrefix(href, "http") { + mention := mf.Mention{ + // SOURCE is own domain this time, TARGET = outbound + Source: item.link, + Target: href, + } - wg.Add(1) - go func() { - defer wg.Done() - snder.sendMention(mention) - }() + wg.Add(1) + go func() { + sema <- struct{}{} + defer func() { <-sema }() + defer wg.Done() + snder.sendMention(mention) + }() + } } } wg.Wait() diff --git a/app/webmention/send/send_test.go b/app/webmention/send/send_test.go index 0eb164a..9655613 100644 --- a/app/webmention/send/send_test.go +++ b/app/webmention/send/send_test.go @@ -7,12 +7,68 @@ import ( "brainbaking.com/go-jamming/rest" "fmt" "github.com/stretchr/testify/assert" + "io/ioutil" "net/http" "net/url" + "os" "sync" "testing" + "time" ) +func TestSinceForDomain(t *testing.T) { + cases := []struct { + label string + sinceInParam string + sinceInFile string + expected time.Time + }{ + { + "Is since parameter if provided", + "2021-03-09T15:51:43.732Z", + "", + time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC), + }, + { + "Is file contents if since parameter is empty and file is not", + "", + "2021-03-09T15:51:43.732Z", + time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC), + }, + { + "Is empty time if both parameter and file are not present", + "", + "", + time.Time{}, + }, + } + + snder := Sender{ + Conf: &common.Config{ + DataPath: "testdata", + }, + } + + for _, tc := range cases { + t.Run(tc.label, func(t *testing.T) { + os.MkdirAll("testdata", os.ModePerm) + defer os.RemoveAll("testdata") + + if tc.sinceInFile != "" { + ioutil.WriteFile("testdata/domain-since.txt", []byte(tc.sinceInFile), os.ModePerm) + } + + actual := snder.sinceForDomain("domain", tc.sinceInParam) + assert.Equal(t, tc.expected.Year(), actual.Year()) + assert.Equal(t, tc.expected.Month(), actual.Month()) + assert.Equal(t, tc.expected.Day(), actual.Day()) + assert.Equal(t, tc.expected.Hour(), actual.Hour()) + assert.Equal(t, tc.expected.Minute(), actual.Minute()) + assert.Equal(t, tc.expected.Second(), actual.Second()) + }) + } +} + func TestSendMentionAsWebmention(t *testing.T) { passedFormValues := url.Values{} snder := Sender{ @@ -42,6 +98,7 @@ func TestSendMentionIntegrationStressTest(t *testing.T) { Conf: common.Configure(), RestClient: &rest.HttpClient{}, } + defer os.RemoveAll("data") runs := 100 responses := make(chan bool, runs) @@ -85,21 +142,22 @@ func TestSendMentionIntegrationStressTest(t *testing.T) { func TestSendIntegrationTestCanSendBothWebmentionsAndPingbacks(t *testing.T) { posted := map[string]interface{}{} - var lock = sync.RWMutex{} + var lock = sync.Mutex{} + defer os.RemoveAll("data") snder := Sender{ Conf: common.Configure(), RestClient: &mocks.RestClientMock{ GetBodyFunc: mocks.RelPathGetBodyFunc(t, "./../../../mocks/"), PostFunc: func(url string, contentType string, body string) error { - lock.RLock() - defer lock.RUnlock() + lock.Lock() + defer lock.Unlock() posted[url] = body return nil }, PostFormFunc: func(endpoint string, formValues url.Values) error { - lock.RLock() - defer lock.RUnlock() + lock.Lock() + defer lock.Unlock() posted[endpoint] = formValues return nil }, diff --git a/common/config.go b/common/config.go index 91bb649..f01ed88 100644 --- a/common/config.go +++ b/common/config.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "strings" + "sync" "github.com/rs/zerolog/log" ) @@ -17,6 +18,21 @@ type Config struct { DataPath string `json:"dataPath"` AllowedWebmentionSources []string `json:"allowedWebmentionSources"` DisallowedWebmentionDomains []string `json:"disallowedWebmentionDomains"` + + domainLocks map[string]*sync.RWMutex +} + +func (c *Config) Lock(domain string) { + c.domainLocks[domain].Lock() +} +func (c *Config) RLock(domain string) { + c.domainLocks[domain].RLock() +} +func (c *Config) RUnLock(domain string) { + c.domainLocks[domain].RUnlock() +} +func (c *Config) Unlock(domain string) { + c.domainLocks[domain].Unlock() } func (c *Config) missingKeys() []string { @@ -63,14 +79,37 @@ func (c *Config) FetchDomain(url string) (string, error) { return "", errors.New("no allowed domain found for url " + url) } -func (c *Config) SetupDataDirs() { - for _, domain := range c.AllowedWebmentionSources { - os.MkdirAll(c.DataPath+"/"+domain, os.ModePerm) +func NewConfig(c *Config) *Config { + conf := &Config{ + Port: c.Port, + Token: c.Token, + UtcOffset: c.UtcOffset, + DataPath: c.DataPath, + AllowedWebmentionSources: c.AllowedWebmentionSources, + DisallowedWebmentionDomains: c.DisallowedWebmentionDomains, + domainLocks: map[string]*sync.RWMutex{}, + } + initConfig(conf) + return conf +} + +func Configure() *Config { + conf := config() + initConfig(conf) + return conf +} + +func initConfig(conf *Config) { + conf.domainLocks = map[string]*sync.RWMutex{} + for _, domain := range conf.AllowedWebmentionSources { + conf.domainLocks[domain] = &sync.RWMutex{} + os.MkdirAll(conf.DataPath+"/"+domain, os.ModePerm) + log.Info().Str("allowedDomain", domain).Msg("Configured") } } -func Configure() (c *Config) { +func config() *Config { confData, err := ioutil.ReadFile("config.json") if err != nil { log.Warn().Msg("No config.json file found, reverting to defaults...") diff --git a/common/time.go b/common/time.go index ed6cf89..7b8d249 100644 --- a/common/time.go +++ b/common/time.go @@ -9,13 +9,22 @@ import ( // None of the above are very appealing. For now, just use the lazy way. var Now = time.Now -// since should be in ISO String format, as produced by clients using day.js - e.g. 2021-04-09T15:51:43.732Z +const ( + IsoFormat = "2006-01-02T15:04:05.000Z" +) + +// TimeToIso converts time to ISO string format, up to seconds. +func TimeToIso(theTime time.Time) string { + return theTime.Format(IsoFormat) +} + +// IsoToTime converts an ISO time string into a time.Time object +// As produced by clients using day.js - e.g. 2021-04-09T15:51:43.732Z func IsoToTime(since string) time.Time { if since == "" { return time.Time{} } - layout := "2006-01-02T15:04:05.000Z" - t, err := time.Parse(layout, since) + t, err := time.Parse(IsoFormat, since) if err != nil { log.Warn().Str("time", since).Msg("Invalid ISO date, reverting to now()") return Now() diff --git a/common/time_test.go b/common/time_test.go index 3a66573..199ff0a 100644 --- a/common/time_test.go +++ b/common/time_test.go @@ -23,6 +23,14 @@ func TestSendSuite(t *testing.T) { suite.Run(t, new(TimeSuite)) } +func (s *TimeSuite) TestTimeToIso() { + theTime := time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC) + expected := "2021-03-09T15:51:43.000Z" + actual := TimeToIso(theTime) + + assert.Equal(s.T(), expected, actual) +} + func (s *TimeSuite) TestIsoToTimeInISOString() { expectedtime := time.Date(2021, time.March, 9, 15, 51, 43, 732, time.UTC) since := IsoToTime("2021-03-09T15:51:43.732Z")