From a2ea583c9d1c8c1badcb7f5de9cc2ba03bd50d12 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Sun, 10 Feb 2019 02:54:43 -0600 Subject: [PATCH] Better signaling with pubsub --- .gitignore | 1 + README.md | 53 +++++++++++++- browser-answer.html | 45 +++++++----- browser-offer.html | 42 +++++++---- browser.js | 82 +++++++++------------- cli_offer.go | 165 ++++++++++++++++++++++++++++++++++++++++++++ local_web_server.go | 6 +- 7 files changed, 304 insertions(+), 90 deletions(-) create mode 100644 .gitignore create mode 100644 cli_offer.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..78c5494 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/*.exe diff --git a/README.md b/README.md index 9ecc144..4d0feb9 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,51 @@ -Early sandbox repo playing around w/ getting signaling to work w/ WebRTC via IPFS. +# WebRTC IPFS Signaling -Result: IPFS is close but not quite ready for DHT. It doesn't have any bootstrappable peers that I can see and the built -JS from current master is like 2.5MB minimized. Will be waiting... \ No newline at end of file +This project is a proof of concept to see whether we can use IPFS to do WebRTC signaling obviating the need for a +separate server. + +### Goal 1 - Browser to Browser Signaling + +Status: **Accomplished** + +I currently have debugging turned on so the console logs do show some details. Steps: + +Navigate to https://cretz.github.io/webrtc-ipfs-signaling/browser-offer.html. It will redirect to a version with a +random URL hash. (Of course, you could navigate to a hand-entered URL hash). Take the URL given on that page and, in +theory, open it up with the latest Chrome or FF anywhere in the world (that doesn't require TURN). After a few seconds, +the offer/answer will communicate and WebRTC will be connected. + +Quick notes: + +* It may seem like you need some kind of server to share that URL hash, but that's just an example to make it easier. It + could be any preshared value, though you'll want it unique and may want to do other forms of authentication once + connected. E.g. one person could just go to + https://cretz.github.io/webrtc-ipfs-signaling/browser-offer.html#someknownkey and the other person could just go to + https://cretz.github.io/webrtc-ipfs-signaling/browser-answer.html#someknownkey +* This works with `file:///` on Chrome and FF too, even across each other and even with http domains. +* I have not tested mobile. +* This uses js-ipfs's pubsub support which is in an experimental state. I even hardcode a swarm to + https://ws-star.discovery.libp2p.io/. So it probably won't work if this repo goes stale (which it likely will). +* js-ipfs is pretty big at > 600k right now. +* You might ask, "Why use WebRTC at all if you have a PubSub connection?" The features of WebRTC are many, in fact with + the latest Chrome release, I am making a screen sharing tool requiring no local code. +* This is just a tech demo so I took a lot of shortcuts like not supporting restarts, poor error handling, etc. It would + be quite trivial to have multiple subscribers to a topic and group chat with multiple +* Security...not much. Essentially you need to do some other form of security (WebRTC peerIdentity? app level ID verify + once connected? etc). In practice, this is like an open signaling server. + +**How it Works** + +It's quite simple. Both sides subscribe t a pubsub topic on the preshared identifier. Then I just send JSON'd +[RTCSessionDescription](https://developer.mozilla.org/en-US/docs/Web/API/RTCSessionDescription)s. Specifically, the +offer side sends the offer and waits for an answer whereas the answer side waits for an offer then sends an answer. + +### Goal 2 - Browser to Native App Signaling + +Status: **Failing** + +I was pretty sure I could easily hook up [ipfs/go-ipfs](https://github.com/ipfs/go-ipfs) with +[pions/webrtc](https://github.com/pions/webrtc) and be all good. Although `pions/webrtc` is beautifully built, go-ipfs +is not and very much not developer friendly for reading code or embedding. I was forced to use +[ipsn/go-ipfs](https://github.com/ipsn/go-ipfs) which re-best-practicizes the deps. I have the code at `cli_offer.go` +and while it looks right, the JS side cannot see the PubSub messages. I have a lot of annoying debugging to do in that +unfriendly codebase. \ No newline at end of file diff --git a/browser-answer.html b/browser-answer.html index 578d624..ee29c9a 100644 --- a/browser-answer.html +++ b/browser-answer.html @@ -1,19 +1,19 @@ - WebRTC IPFS Signalling Answerer - + WebRTC IPFS Signaling Answerer + -

WebRTC IPFS Signalling Answerer

-
- URL to offer: - - -
-
+

WebRTC IPFS Signaling Answerer

+
+ URL to offer: + + +
+
@@ -38,8 +38,6 @@

WebRTC IPFS Signalling Answerer

} // Create IPFS and do everything else once ready newIPFS(ipfs => { - console.log('IPFS ready') - // Create new RTC conn const pc = new RTCPeerConnection({ // Could have a TURN server too, not worth it for the demo @@ -55,19 +53,28 @@

WebRTC IPFS Signalling Answerer

// No candidate means we're done if (e.candidate === null) { // Write the answer - writeIPFSFile(ipfs, 'answer', JSON.stringify(pc.localDescription), () => { - console.log('Added answer') + debug('Writing answer') + ipfsPublish(ipfs, JSON.stringify(pc.localDescription), () => { + debug('Added answer') }) } } // Wait for offer - console.log('Waiting for offer') - waitForIPFSFile(ipfs, 'offer', offer => { - console.log('Got offer, adding answer') - pc.setRemoteDescription(new RTCSessionDescription(JSON.parse(offer))).catch(console.error) - pc.createAnswer().then(d => pc.setLocalDescription(d)).catch(console.error) - }) + debug('Waiting for offer') + let gotOffer = false + ipfsSubscribe( + ipfs, + data => { + const obj = JSON.parse(data) + debug('Received data', obj) + if (obj.type == 'offer') { + gotOffer = true + pc.setRemoteDescription(new RTCSessionDescription(obj)).then(() => { + pc.createAnswer().then(d => pc.setLocalDescription(d)).catch(console.error) + }).catch(console.error) + } + }, () => { }) }) diff --git a/browser-offer.html b/browser-offer.html index dbbb9de..3484bb3 100644 --- a/browser-offer.html +++ b/browser-offer.html @@ -1,13 +1,13 @@ - WebRTC IPFS Signalling Offerer - + WebRTC IPFS Signaling Offerer + -

WebRTC IPFS Signalling Offerer

+

WebRTC IPFS Signaling Offerer

URL to answer: @@ -38,8 +38,6 @@

WebRTC IPFS Signalling Offerer

} // Create IPFS and do everything else once ready newIPFS(ipfs => { - console.log('IPFS ready') - // Create new RTC conn const pc = new RTCPeerConnection({ // Could have a TURN server too, not worth it for the demo @@ -50,19 +48,35 @@

WebRTC IPFS Signalling Offerer

setupChatChannel(pc.createDataChannel('chat')) // Add the handlers - pc.oniceconnectionstatechange = e => console.log('RTC connection state change', pc.iceConnectionState) + pc.oniceconnectionstatechange = e => debug('RTC connection state change', pc.iceConnectionState) pc.onicecandidate = e => { // No candidate means we're done if (e.candidate === null) { - // Write the offer - writeIPFSFile(ipfs, 'offer', JSON.stringify(pc.localDescription), () => { - console.log('Added offer, waiting for answer') - // Wait for answer - waitForIPFSFile(ipfs, 'answer', answer => { - console.log('Got answer') - pc.setRemoteDescription(new RTCSessionDescription(JSON.parse(answer))).catch(console.error) + // Keep publishing the offer until we receive the answer + let gotAnswer = false + ipfsSubscribe( + ipfs, + data => { + if (gotAnswer) return + const obj = JSON.parse(data) + debug('Received data', obj) + if (obj.type == 'answer') { + gotAnswer = true + pc.setRemoteDescription(new RTCSessionDescription(obj)) + } + }, + () => { + // This is the callback meaning subscribe completed so we can start sending offers + const sendOffer = () => { + if (gotAnswer) return + debug('Sending offer') + ipfsPublish(ipfs, JSON.stringify(pc.localDescription), () => { + // Try again in a couple of seconds + setTimeout(() => sendOffer(), 2000) + }) + } + sendOffer() }) - }) } } pc.onnegotiationneeded = e => diff --git a/browser.js b/browser.js index 11f210b..e10a965 100644 --- a/browser.js +++ b/browser.js @@ -1,10 +1,11 @@ const debug = console.log +// const debug = () => { } function createWindowHashIfNotPresent() { if (window.location.hash) return const base58Chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' - let array = new Uint8Array(40) + let array = new Uint8Array(20) window.crypto.getRandomValues(array); array = array.map(x => base58Chars.charCodeAt(x % base58Chars.length)); window.history.replaceState(null, null, '#' + String.fromCharCode.apply(null, array)) @@ -12,14 +13,20 @@ function createWindowHashIfNotPresent() { function newIPFS(cb) { const ipfs = new Ipfs({ - // repo: String(Math.random() + Date.now()), + repo: String(Math.random() + Date.now()), + EXPERIMENTAL: { pubsub: true }, + config: { + Addresses: { + Swarm: ['/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star'] + } + } // libp2p: { // config: { // dht: { // enabled: true // } // } - // }, + // } // relay: { // enabled: true, // hop: {enabled: true} @@ -40,56 +47,33 @@ function newIPFS(cb) { } function ipfsDirBase() { - return window.location.hash.substring(1) -} - -function writeIPFSFile(ipfs, file, content, cb) { - const path = '/' + ipfsDirBase() + '/' + file - const buf = ipfs.types.Buffer.from(content) - debug('Writing to ' + path) - writeIPFSFileMFS(ipfs, path, buf, cb) - // writeIPFSFileDHT(ipfs, path, buf, cb) + return 'wis-poc-' + window.location.hash.substring(1) } -function writeIPFSFileMFS(ipfs, path, buf, cb) { - ipfs.files.write(path, buf, { create: true, truncate: true, parents: true}).then(() => { - console.log('Wrote to ' + path) - cb() - }).catch(console.error) -} - -function writeIPFSFileDHT(ipfs, path, buf, cb) { - ipfs.dht.put(ipfs.types.Buffer.from(path), buf, (err) => { - if (err) throw err - console.log('Wrote to ' + path) - cb() - }) +function ipfsSubscribe(ipfs, handle, cb) { + ipfs.pubsub.subscribe( + ipfsDirBase(), + msg => handle(msg.data.toString('utf8')), + err => { + if (err) console.error('Failed subscribe', err) + else { + debug('Subscribe to ' + ipfsDirBase() + ' complete') + cb() + } + }) } -function waitForIPFSFile(ipfs, file, cb) { - const path = '/' + ipfsDirBase() + '/' + file - debug('Attempting to read from ' + path) - waitForIPFSFileMFS(ipfs, path, cb) - // waitForIPFSFileDHT(ipfs, path, cb) -} - -function waitForIPFSFileMFS(ipfs, path, cb) { - ipfs.files.read(path, (err, buf) => { - if (err) { - debug('Failed reading', err.message) - if (!err.message || !err.message.endsWith(path + ' does not exist')) throw err - setTimeout(() => waitForIPFSFileMFS(ipfs, path, cb), 2000) - } else cb(buf.toString('utf8')) - }) -} - -function waitForIPFSFileDHT(ipfs, path, cb) { - ipfs.dht.get(ipfs.types.Buffer.from(path), (err, value) => { - if (err) { - debug('Failed reading', err.message) - setTimeout(() => waitForIPFSFileDHT(ipfs, path, cb), 2000) - } else cb(value.toString('utf8')) - }) +function ipfsPublish(ipfs, data, cb) { + ipfs.pubsub.publish( + ipfsDirBase(), + ipfs.types.Buffer.from(data), + err => { + if (err) console.error('Failed publish', err) + else { + debug('Publish complete') + cb() + } + }) } function setupChatChannel(channel) { diff --git a/cli_offer.go b/cli_offer.go new file mode 100644 index 0000000..206f86d --- /dev/null +++ b/cli_offer.go @@ -0,0 +1,165 @@ +package main + +import ( + "bufio" + "context" + "crypto/rand" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/ipsn/go-ipfs/core" + "github.com/ipsn/go-ipfs/core/coreapi" + coreiface "github.com/ipsn/go-ipfs/core/coreapi/interface" + "github.com/ipsn/go-ipfs/core/coreapi/interface/options" + "github.com/pions/webrtc" + "github.com/pions/webrtc/pkg/datachannel" + "github.com/pions/webrtc/pkg/ice" +) + +var debugEnabled = true + +func main() { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + // Get ID + id, err := parseFlags() + assertNoErr(err) + fmt.Printf("Visit browser-answer.html#%v or go run cli_answer.go %v\n", id, id) + // Create IPFS + pubsub, err := newIPFSPubSub(ctx) + assertNoErr(err) + // Create peer conn + pc, err := webrtc.New(webrtc.RTCConfiguration{ + IceServers: []webrtc.RTCIceServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}, + }) + assertNoErr(err) + // Create the chat channel + channel, err := pc.CreateDataChannel("chat", nil) + assertNoErr(err) + setupChatChannel(channel) + // Debug state change info + pc.OnICEConnectionStateChange(func(state ice.ConnectionState) { + debugf("RTC connection state change: %v", state) + }) + // Subscribe to offer/answer topic + topic := "wis-poc-" + id + sub, err := pubsub.Subscribe(ctx, topic, options.PubSub.Discover(true)) + assertNoErr(err) + debugf("Subscribe to %v complete", topic) + // Listen for first answer + answerCh := make(chan *webrtc.RTCSessionDescription, 1) + go func() { + var desc webrtc.RTCSessionDescription + for { + msg, err := sub.Next(ctx) + assertNoErr(err) + assertNoErr(json.Unmarshal(msg.Data(), &desc)) + debugf("Received data: %v", desc.Type) + if desc.Type == webrtc.RTCSdpTypeAnswer { + answerCh <- &desc + break + } + } + }() + // Create the offer and keep sending until received + offer, err := pc.CreateOffer(nil) + assertNoErr(err) + offerBytes, err := json.Marshal(offer) + assertNoErr(err) + var answer *webrtc.RTCSessionDescription +WaitForAnswer: + for { + debugf("Sending offer") + assertNoErr(pubsub.Publish(ctx, topic, offerBytes)) + select { + case answer = <-answerCh: + fmt.Printf("Got answer") + break WaitForAnswer + case <-time.After(2 * time.Second): + } + } + // Now that we have an answer, set it and block forever (the chat channel does the work now) + assertNoErr(pc.SetRemoteDescription(*answer)) + select {} +} + +func newIPFSPubSub(ctx context.Context) (coreiface.PubSubAPI, error) { + cfg := &core.BuildCfg{ + Online: true, + ExtraOpts: map[string]bool{"pubsub": true}, + } + if node, err := core.NewNode(ctx, cfg); err != nil { + return nil, err + } else if api, err := coreapi.NewCoreAPI(node); err != nil { + return nil, err + } else { + return api.PubSub(), nil + } +} + +func setupChatChannel(channel *webrtc.RTCDataChannel) { + channel.OnClose(func() { printChatLn("**system** chat closed") }) + channel.OnOpen(func() { + printChatLn("**system** chat started") + // Just read stdin forever and try to send it over or panic + r := bufio.NewReader(os.Stdin) + for { + text, _ := r.ReadString('\n') + printChatLn("**me** " + text) + err := channel.Send(datachannel.PayloadString{Data: []byte(text)}) + assertNoErr(err) + } + }) + channel.OnMessage(func(p datachannel.Payload) { + if p.PayloadType() != datachannel.PayloadTypeString { + panic("Expected string payload") + } + printChatLn("**them** " + string(p.(datachannel.PayloadString).Data)) + }) +} + +var chatLnMutex sync.Mutex + +func printChatLn(line string) { + chatLnMutex.Lock() + defer chatLnMutex.Unlock() + fmt.Println(line) +} + +func parseFlags() (id string, err error) { + fs := flag.NewFlagSet("", flag.ExitOnError) + fs.BoolVar(&debugEnabled, "v", false, "Show debug information") + fs.StringVar(&id, "id", "", "The identifier to put offer for (optional, generated when not present)") + assertNoErr(fs.Parse(os.Args[1:])) + if fs.NArg() > 0 { + return "", fmt.Errorf("Unrecognized args: %v", fs.Args()) + } + // Generate ID if not there + if id == "" { + const base58Chars = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" + var idBytes [20]byte + if _, err := rand.Read(idBytes[:]); err == nil { + for _, idByte := range idBytes { + id += string(base58Chars[int(idByte)%len(base58Chars)]) + } + } + } + return +} + +func debugf(format string, v ...interface{}) { + if debugEnabled { + log.Printf(format, v...) + } +} + +func assertNoErr(err error) { + if err != nil { + panic(err) + } +} diff --git a/local_web_server.go b/local_web_server.go index 35f805e..f2fb023 100644 --- a/local_web_server.go +++ b/local_web_server.go @@ -6,10 +6,6 @@ import ( ) func main() { - fs := http.FileServer(http.Dir(".")) - err := http.ListenAndServe(":8080", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Access-Control-Allow-Origin", "*") - fs.ServeHTTP(w, r) - })) + err := http.ListenAndServe(":8080", http.FileServer(http.Dir("."))) log.Fatal(err) }