Skip to content

Commit

Permalink
Patch some race conditions on queue items
Browse files Browse the repository at this point in the history
  • Loading branch information
ejv2 committed Aug 10, 2024
1 parent d5823a0 commit cbd19cd
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 24 deletions.
6 changes: 3 additions & 3 deletions data/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func (c *Cache) loadFile(path string, startup bool) {
c.episodes.Store(path, ep)
}

// Download starts an asynchronous download in a new goroutine.
// Returns the ID in the downloads table, which must be accessed
// using a mutex.
// Download starts an asynchronous download in a new goroutine. Returns the ID
// in the downloads table, which must be accessed using a mutex. Does not
// require a lock on item to be called.
func (c *Cache) Download(item *QueueItem) (id int, err error) {
f, err := os.Create(item.Path)
dl := Download{
Expand Down
23 changes: 16 additions & 7 deletions data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ type Download struct {
//
// Used internally by cache; avoid calling directly.
func (d *Download) DownloadYoutube(hndl ev.Handler) {
d.Elem.RLock()
if !d.Elem.Youtube {
panic("download: downloading non-youtube with youtube-dl")
}
d.Elem.RUnlock()

// Work around "already downloaded" errors from youtube-dl
d.File.Close()
Expand Down Expand Up @@ -104,9 +106,11 @@ func (d *Download) DownloadYoutube(hndl ev.Handler) {
return
}

d.Elem.RLock()
h, _ := os.UserHomeDir()
tmppath := filepath.Join(h, "podbit-ytdl"+strconv.FormatInt(time.Now().UnixMicro(), 10))
flags := append(strings.Split(YoutubeFlags, " "), "-o", tmppath+".%(ext)s", d.Elem.URL)
d.Elem.RUnlock()

proc := exec.Command(loader, flags...)
r, err := proc.StdoutPipe()
Expand Down Expand Up @@ -160,9 +164,10 @@ func (d *Download) DownloadYoutube(hndl ev.Handler) {
default:
}
}
hndl.Post(ev.DownloadChanged)
r.Close()

if err != nil && err.Error() != "EOF" {
if err.Error() != "EOF" {
d.mut.Lock()
d.Completed = true
d.Success = false
Expand All @@ -185,9 +190,8 @@ func (d *Download) DownloadYoutube(hndl ev.Handler) {
os.Rename(tmppath+".mp3", d.Path)

// Final clean up
Q.mutex.Lock()
d.Elem.Lock()
d.Elem.State = StateReady
Q.mutex.Unlock()

d.mut.Lock()
d.Completed = true
Expand All @@ -198,6 +202,7 @@ func (d *Download) DownloadYoutube(hndl ev.Handler) {
Downloads.downloadsMutex.Unlock()

d.mut.Unlock()
d.Elem.Unlock()

hndl.Post(ev.DownloadChanged)
}
Expand All @@ -208,6 +213,7 @@ func (d *Download) DownloadYoutube(hndl ev.Handler) {
//
// Used internally by cache; avoid calling directly.
func (d *Download) DownloadHTTP(hndl ev.Handler) {
d.Elem.RLock()
resp, err := http.Get(d.Elem.URL)
if err != nil || resp.StatusCode != http.StatusOK {
d.mut.Lock()
Expand All @@ -224,15 +230,16 @@ func (d *Download) DownloadHTTP(hndl ev.Handler) {
hndl.Post(ev.DownloadChanged)
return
}
d.Elem.RUnlock()

d.mut.Lock()
size, _ := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
d.Size = size
d.mut.Unlock()

Q.mutex.Lock()
d.Elem.Lock()
d.Elem.State = StatePending
Q.mutex.Unlock()
d.Elem.Unlock()

var count int64
var read int
Expand Down Expand Up @@ -276,15 +283,17 @@ outer:
} else {
d.Success = true

Q.mutex.Lock()
d.Elem.Lock()
d.Elem.State = StateReady
Q.mutex.Unlock()
d.Elem.Unlock()
}
d.mut.Unlock()

Downloads.downloadsMutex.Lock()
d.Elem.RLock()
Downloads.loadFile(d.Elem.Path, false)
Downloads.ongoing--
d.Elem.RUnlock()
Downloads.downloadsMutex.Unlock()

resp.Body.Close()
Expand Down
4 changes: 4 additions & 0 deletions data/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var StateStrings = [4]string{
// QueueItem represents an item in the player queue
// as provided by newsboat.
type QueueItem struct {
*sync.RWMutex

URL string
Path string
State int
Expand All @@ -81,6 +83,8 @@ type Queue struct {
func (q *Queue) parseField(fields []string, num int) (item QueueItem) {
item.URL = fields[0]
item.Path = strings.ReplaceAll(fields[1], "\"", "")
item.RWMutex = new(sync.RWMutex)

if strings.HasPrefix(item.URL, "+") {
item.Youtube = true
item.URL = item.URL[1:]
Expand Down
16 changes: 8 additions & 8 deletions sound/sound.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func endWait(u chan int) {
if item.Path == Plr.Now.Path {
if !Plr.manualStop {
item.State = data.StateFinished

// We just played this fime so I reckon we can ignore
// file not found errors.
data.Stamps.Touch(item.Path)
Expand Down Expand Up @@ -450,22 +451,20 @@ func Mainloop() {

if !Plr.playing && !Plr.waiting && !Plr.exhausted && len(queue) > 0 {
if elem.State != data.StatePending && data.Downloads.EntryExists(elem.Path) {
elem.Lock()

Plr.play(elem)
wait = endWait

// Set status to played
data.Q.Range(func(_ int, item *data.QueueItem) bool {
if item.Path == elem.Path {
item.State = data.StatePlayed
data.Stamps.Touch(item.Path)
return false
}
elem.State = data.StatePlayed
data.Stamps.Touch(elem.Path)

return true
})
elem.Unlock()
} else {
Plr.waiting = true

elem.RLock()
if y, _ := data.Downloads.IsDownloading(elem.Path); y {
Plr.download = elem
} else {
Expand All @@ -478,6 +477,7 @@ func Mainloop() {
Plr.download = elem
}

elem.RUnlock()
wait = downloadWait
}
}
Expand Down
13 changes: 7 additions & 6 deletions ui/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func (l *Library) renderEpisodes(x, y int) {
_, pod := l.men[0].GetSelection()
eps := data.Q.GetPodcastEpisodes(pod)

data.Q.RLock()
defer data.Q.RUnlock()
for i := len(eps) - 1; i >= 0; i-- {
ep := eps[i]
ep.RLock()

text := ""

entry, ok := data.Downloads.Query(ep.Path)
Expand All @@ -81,6 +81,7 @@ func (l *Library) renderEpisodes(x, y int) {
}

l.men[1].Items = append(l.men[1].Items, text)
ep.RUnlock()
}

l.men[1].Selected = (l.menSel == 1)
Expand Down Expand Up @@ -166,8 +167,8 @@ func (l *Library) StartDownload() {
return
}

data.Q.RLock()
defer data.Q.RUnlock()
item.RLock()
defer item.RUnlock()
if y, _ := data.Downloads.IsDownloading(item.Path); y {
go StatusMessage("Episode already downloading")
return
Expand All @@ -186,12 +187,12 @@ func (l *Library) StartDownload() {
continue
}

data.Q.RLock()
item.RLock()
if y, _ := data.Downloads.IsDownloading(item.Path); y {
go StatusMessage("Episode already downloading")
return
}
data.Q.RUnlock()
item.RUnlock()

go data.Downloads.Download(item)
}
Expand Down

0 comments on commit cbd19cd

Please sign in to comment.