Skip to content

Commit

Permalink
Use SSE consumer instead of fedmsg consumer for Libraries.io
Browse files Browse the repository at this point in the history
Because the libraries.io SSE stream is used only by Anitya. It is a
logical move to make it part of Anitya itself.

This PR is changing the current libraries.io consumer, which
listened to fedmsg messages emitted by sse2fedmsg, to SSE client
which is listening directly to libraries.io SSE stream.

This PR also removes last fedmsg related code. So we can consider Anitya
to be Fedora Messaging ready. \o/

Signed-off-by: Michal Konečný <[email protected]>
  • Loading branch information
Zlopez authored and Zlopez committed Aug 20, 2019
1 parent a830acc commit 62bb442
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 326 deletions.
1 change: 1 addition & 0 deletions anitya/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
# Token for GitHub API
GITHUB_ACCESS_TOKEN=None,
LEGACY_MESSAGING=False, # If True, publish with fedmsg instead of fedora_messaging
SSE_FEED="http://firehose.libraries.io/events",
)

# Start with a basic logging configuration, which will be replaced by any user-
Expand Down
200 changes: 84 additions & 116 deletions anitya/librariesio_consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# This file is part of the Anitya project.
# Copyright (C) 2017 Red Hat, Inc.
# Copyright (C) 2017-2019 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -17,114 +18,70 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
A `fedmsg consumer`_ that is designed to consume fedmsgs generated using
`sse2fedmsg`_ with the `libraries.io firehose`_ SSE feed.
A `sseclient`_ wrapper that is designed to consumed `libraries.io firehose`_ SSE feed.
Using this, it is possible for Anitya to discover new upstream releases using
`libraries.io`_. At the moment, the SSE feed is served over HTTP, so we confirm
the release is really available upstream before announce the new version.
When ``message`` is referenced in this module, it means the fedmsg that has been
deserialized to a Python dictionary with the following keys::
{
"body": {
"username": "jcline",
"i": 1,
"timestamp": 1486743185,
"msg_id": "2017-dcfee1a8-16d4-4684-afea-50aa24754677",
"topic": "org.fedoraproject.dev.sse2fedmsg.libraryio",
"msg": {
"retry": 500,
"data": {
"name": "react-sh",
"project": {
"status": null,
"repository_url": "https://github.com/heineiuo/react-sh",
"latest_release_published_at": "2017-02-10 16:11:54 UTC",
"description": "A super easy shell for react app.",
"language": null,
"platform": "NPM",
"rank": 7,
"keywords": [],
"package_manager_url": "https://www.npmjs.com/package/react-sh",
"stars": 0,
"latest_release_number": "0.4.0",
"normalized_licenses": [
"MIT"
],
"forks": 0,
"homepage": "https://github.com/heineiuo/react-sh",
"name": "react-sh"
},
"platform": "NPM",
"published_at": "2017-02-10 16:11:54 UTC",
"version": "0.4.0",
"package_manager_url": "https://www.npmjs.com/package/react-sh"
},
"event": "event",
"id": null
}
}
}
Note that the contents of ``msg`` is the actual server-sent event converted to
JSON.
.. libraries.io: https://libraries.io/
.. libraries.io firehose: https://github.com/librariesio/firehose
.. sse2fedmsg: https://pypi.python.org/pypi/sse2fedmsg
.. fedmsg consumer: http://www.fedmsg.com/en/latest/consuming/#the-hub-consumer-approach
.. _libraries.io: https://libraries.io/
.. _libraries.io firehose: https://github.com/librariesio/firehose
.. _sseclient: https://pypi.python.org/pypi/sseclient/
"""
from __future__ import absolute_import, unicode_literals

import logging

from fedmsg.consumers import FedmsgConsumer
import json

from anitya import config
from anitya.db import models, Session, initialize
from anitya.lib import exceptions, plugins, utilities

from sseclient import SSEClient

_log = logging.getLogger(__name__)


class LibrariesioConsumer(FedmsgConsumer):
class LibrariesioConsumer(object):
"""
A fedmsg consumer.
This class connects to the Server-Sent Events feed provided and creates or
updates the project according to the event.
Attributes:
topic (list): A list of strings that indicates the message topics this
consumer gets. This consumer accepts a single topic,
``org.fedoraproject.prod.sse2fedmsg.libraryio``
config_key (str): A string that must be set to `True` in the fedmsg
configuration to enable this consumer.
feed (str): The Server-Sent events feed URL.
whitelist (list): Whitelisted platforms. See `https://libraries.io/`_
for the list of available platforms.
"""

topic = ["sse2fedmsg.librariesio"]

config_key = "anitya.libraryio.enabled"

def __init__(self, hub):
# If we're in development mode, add the dev versions of the topics so
# local playback with fedmsg-dg-replay works as expected.
prefix, env = hub.config["topic_prefix"], hub.config["environment"]
self.topic = [".".join([prefix, env, topic]) for topic in self.topic]
_log.info("Subscribing to the following fedmsg topics: %r", self.topic)
def __init__(self):
"""
Constructor loads relevant values from configuration.
"""
self.feed = config.config["SSE_FEED"]
self.whitelist = config.config["LIBRARIESIO_PLATFORM_WHITELIST"]
_log.info("Subscribing to the following SSE feed: {}".format(self.feed))

initialize(config.config)

super(LibrariesioConsumer, self).__init__(hub)
hub.config["topic_prefix"] = "org.release-monitoring"

def consume(self, message):
def run(self): # pragma: no cover
"""
Start the SSE client.
This call is blocking and will continue until the underlying TCP
connection is closed.
"""
This method is called when a message with a topic this consumer
subscribes to arrives.
_log.info("Starting Server-Sent Events client for {}".format(self.feed))
sse_stream = SSEClient(self.feed)
for sse_message in sse_stream:
# If the server sends too many newlines the client can generate
# messages that are completely empty, so we filter those here.
if sse_message.data:
self.process_message(sse_message)
_log.debug("Received message from SSE: {}".format(sse_message))

def process_message(self, message):
"""
This method is called when a incoming SSE message is received.
If the project is unknown to Anitya, a new :class:`Project` is created,
but only if the 'create_librariesio_projects' configuration key is set.
but only if the platform is whitelisted. See `self.whitelist`.
If it's an existing project, this will call Anitya's ``check_project_release``
API to update the latest version information. This is because we are
subscribed to libraries.io over HTTP and therefore have no
Expand All @@ -138,7 +95,16 @@ def consume(self, message):
Args:
message (dict): The fedmsg to process.
"""
librariesio_msg = message["body"]["msg"]["data"]
# The SSE spec requires all data to be UTF-8 encoded
try:
librariesio_msg = json.loads(message.data, encoding="utf-8")
except json.JSONDecodeError:
_log.warning(
"Dropping librariesio update message. Invalid json '{}'.".format(
message.data
)
)
return
name = librariesio_msg["name"]
platform = librariesio_msg["platform"].lower()
version = librariesio_msg["version"]
Expand All @@ -148,71 +114,73 @@ def consume(self, message):
break
else:
_log.debug(
"Dropped librariesio update to %s for %s (%s) since it is on the "
"unsupported %s platform",
version,
name,
homepage,
platform,
"Dropped librariesio update to {} for {} ({}) since"
" it is on the unsupported {} platform".format(
version, name, homepage, platform
)
)
return

session = Session()
project = models.Project.by_name_and_ecosystem(session, name, ecosystem.name)
if (
project is None
and platform in config.config["LIBRARIESIO_PLATFORM_WHITELIST"]
):
project = models.Project.query.filter_by(
name=name, ecosystem_name=ecosystem.name
).one_or_none()
if project is None and platform in self.whitelist:
try:
project = utilities.create_project(
session, name, homepage, "anitya", backend=ecosystem.default_backend
)
utilities.check_project_release(project, Session)
_log.info(
"Discovered new project at version %s via libraries.io: %r",
version,
project,
"Discovered new project at version {} via libraries.io: {}".format(
version, project
)
)
except exceptions.AnityaException as e:
_log.error(
"A new project was discovered via libraries.io, %r, "
'but we failed with "%s"',
project,
str(e),
"A new project was discovered via libraries.io, {}, "
'but we failed with "{}"'.format(name, str(e))
)
elif project is None:
_log.info(
"Discovered new project, %s, on the %s platform, but anitya is "
"configured to not create the project",
name,
platform,
"Discovered new project, {}, on the {} platform, but anitya is "
"configured to not create the project".format(name, platform)
)
else:
_log.info(
"libraries.io has found an update (version %s) for project %r",
version,
project,
"libraries.io has found an update (version {}) for project {}".format(
version, project.name
)
)
# This will fetch the version, emit fedmsgs, add log entries, and
# commit the transaction.
try:
utilities.check_project_release(project, session)
except exceptions.AnityaPluginException as e:
_log.warning(
"libraries.io found an update for %r, but we failed with %s",
project,
str(e),
"libraries.io found an update for {}, but we failed with {}".format(
project, str(e)
)
)

# Refresh the project object that was committed by either
# ``create_project`` or ``check_project_release`` above
project = models.Project.by_name_and_ecosystem(session, name, ecosystem.name)
project = models.Project.query.filter_by(
name=name, ecosystem_name=ecosystem.name
).one_or_none()
if project and project.latest_version != version:
_log.info(
"libraries.io found %r had a latest version of %s, but Anitya found %s",
project,
version,
project.latest_version,
"libraries.io found {} had a latest version of {}, but Anitya found {}".format(
project, version, project.latest_version
)
)

Session.remove()


if __name__ == "__main__":
"""
Main section.
"""
client = LibrariesioConsumer()
client.run()
2 changes: 2 additions & 0 deletions anitya/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
social_auth_login_error_url = "/login-error/"
librariesio_platform_whitelist = ['pypi', 'rubygems']
sse_feed = "http://firehose.libraries.io/events"
default_regex = "a*b*"
github_access_token = "foobar"
Expand Down Expand Up @@ -170,6 +171,7 @@ def test_full_config_file(self, mock_exists, mock_log):
"DEFAULT_REGEX": "a*b*",
"GITHUB_ACCESS_TOKEN": "foobar",
"LEGACY_MESSAGING": True,
"SSE_FEED": "http://firehose.libraries.io/events",
}
config = anitya_config.load()
self.assertEqual(sorted(expected_config.keys()), sorted(config.keys()))
Expand Down
Loading

0 comments on commit 62bb442

Please sign in to comment.