Skip to content

Commit a0838b3

Browse files
committed
major refactoring: splitted low-level and high-level transport API
1 parent 91db428 commit a0838b3

25 files changed

+828
-632
lines changed

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,15 @@ add_library(Homa
7676
src/CHoma.cc
7777
src/Debug.cc
7878
src/Driver.cc
79-
src/Homa.cc
8079
src/Perf.cc
8180
src/Policy.cc
81+
src/PollModeTransportImpl.cc
8282
src/Receiver.cc
8383
src/Sender.cc
8484
src/Shenango.cc
8585
src/StringUtil.cc
8686
src/ThreadId.cc
8787
src/TransportImpl.cc
88-
src/TransportPoller.cc
8988
src/Util.cc
9089
)
9190
add_library(Homa::Homa ALIAS Homa)
@@ -257,6 +256,7 @@ add_executable(unit_test
257256
src/IntrusiveTest.cc
258257
src/ObjectPoolTest.cc
259258
src/PolicyTest.cc
259+
src/PollModeTransportImplTest.cc
260260
src/ReceiverTest.cc
261261
src/SenderTest.cc
262262
src/SpinLockTest.cc

include/Homa/Bindings/CHoma.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,44 +142,49 @@ extern void homa_outmsg_release(homa_outmsg out_msg);
142142
/* ============================ */
143143

144144
/**
145-
* homa_trans_create - C-binding for Homa::Transport::create
145+
* homa_trans_create - C-binding for Homa::TransportBase::create
146146
*/
147147
extern homa_trans homa_trans_create(homa_driver drv, homa_callbacks cbs,
148148
uint64_t id);
149149

150150
/**
151-
* homa_trans_free - C-binding for Homa::Transport::free
151+
* homa_trans_free - C-binding for Homa::TransportBase::free
152152
*/
153153
extern void homa_trans_free(homa_trans trans);
154154

155155
/**
156-
* homa_trans_alloc - C-binding for Homa::Transport::alloc
156+
* homa_trans_alloc - C-binding for Homa::TransportBase::alloc
157157
*/
158158
extern homa_outmsg homa_trans_alloc(homa_trans trans, uint16_t port);
159159

160160
/**
161-
* homa_trans_check_timeouts - C-binding for Homa::Transport::checkTimeouts
161+
* homa_trans_get_drv - C-binding for Homa::TransportBase::getDriver
162162
*/
163-
extern uint64_t homa_trans_check_timeouts(homa_trans trans);
163+
extern homa_driver homa_trans_get_drv(homa_trans trans);
164164

165165
/**
166-
* homa_trans_id - C-binding for Homa::Transport::getId
166+
* homa_trans_id - C-binding for Homa::TransportBase::getId
167167
*/
168168
extern uint64_t homa_trans_id(homa_trans trans);
169169

170170
/**
171-
* homa_trans_proc - C-binding for Homa::Transport::processPacket
171+
* homa_trans_check_timeouts - C-binding for Core::Transport::checkTimeouts
172+
*/
173+
extern uint64_t homa_trans_check_timeouts(homa_trans trans);
174+
175+
/**
176+
* homa_trans_proc - C-binding for Core::Transport::processPacket
172177
*/
173178
extern void homa_trans_proc(homa_trans trans, uintptr_t desc, void* payload,
174179
int32_t len, uint32_t src_ip);
175180

176181
/**
177-
* homa_trans_try_send - C-binding for Homa::Transport::trySend
182+
* homa_trans_try_send - C-binding for Core::Transport::trySend
178183
*/
179184
extern bool homa_trans_try_send(homa_trans trans, uint64_t* wait_until);
180185

181186
/**
182-
* homa_trans_try_grant - C-binding for Homa::Transport::trySendGrants
187+
* homa_trans_try_grant - C-binding for Core::Transport::trySendGrants
183188
*/
184189
extern bool homa_trans_try_grant(homa_trans trans);
185190

include/Homa/Core/Transport.h

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/* Copyright (c) 2020, Stanford University
2+
*
3+
* Permission to use, copy, modify, and/or distribute this software for any
4+
* purpose with or without fee is hereby granted, provided that the above
5+
* copyright notice and this permission notice appear in all copies.
6+
*
7+
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8+
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9+
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10+
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11+
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12+
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13+
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14+
*/
15+
16+
/**
17+
* @file Homa/Core/Transport.h
18+
*
19+
* Contains the low-level Homa Transport API. Advanced users of the Homa
20+
* Transport library should include this header.
21+
*/
22+
23+
#pragma once
24+
25+
#include <Homa/Homa.h>
26+
27+
namespace Homa::Core {
28+
29+
/**
30+
* Minimal set of low-level API that can be used to create Homa-based transports
31+
* for different runtime environments (e.g. polling, kernel threading,
32+
* green threads, etc).
33+
*
34+
* The execution of a transport is driven through repeated calls to methods
35+
* like checkTimeouts(), processPacket(), trySend(), and trySendGrants(); the
36+
* transport will not make any progress otherwise. Advanced users can compose
37+
* these methods in a way that suits them best.
38+
*
39+
* This class is thread-safe.
40+
*/
41+
class Transport : public TransportBase {
42+
public:
43+
/**
44+
* Collection of user-defined transport callbacks.
45+
*/
46+
class Callbacks {
47+
public:
48+
/**
49+
* Destructor.
50+
*/
51+
virtual ~Callbacks() = default;
52+
53+
/**
54+
* Invoked when an incoming message arrives and needs to dispatched to
55+
* its destination in the user application for processing.
56+
*
57+
* Here are a few example use cases of this callback:
58+
* <ul>
59+
* <li> Interaction with the user's thread scheduler: e.g., an
60+
* application may want to block on receive until a message is
61+
* delivered, so this method can be used to wake up blocking threads.
62+
* <li> High-performance message dispatch: e.g., an application may
63+
* choose to implement the message receive queue with a concurrent MPMC
64+
* queue as opposed to a linked-list protected by a mutex; <li>
65+
* Lightweight synchronization: e.g., the socket table that maps from
66+
* port numbers to sockets is a read-mostly data structure, so lookup
67+
* operations can benefit from synchronization schemes such as RCU.
68+
* </ul>
69+
*
70+
* @param port
71+
* Destination port number of the message.
72+
* @param message
73+
* Incoming message to dispatch.
74+
* @return
75+
* True if the message is delivered successfully; false, otherwise.
76+
*/
77+
virtual bool deliver(uint16_t port, InMessage* message) = 0;
78+
79+
/**
80+
* Invoked when some packets just became ready to be sent (and there was
81+
* none before).
82+
*
83+
* This callback allows the transport library to notify the users that
84+
* trySend() should be invoked again as soon as possible. For example,
85+
* the callback can be used to implement wakeup signals for the thread
86+
* that is responsible for calling trySend(), if this thread decides to
87+
* sleep when there is no packets to send.
88+
*/
89+
virtual void notifySendReady() {}
90+
};
91+
92+
/**
93+
* Return a new instance of a Homa-based transport.
94+
*
95+
* @param driver
96+
* Driver with which this transport should send and receive packets.
97+
* @param callbacks
98+
* Collection of user-defined callbacks to customize the behavior of
99+
* the transport.
100+
* @param transportId
101+
* This transport's unique identifier in the group of transports among
102+
* which this transport will communicate.
103+
* @return
104+
* Pointer to the new transport instance.
105+
*/
106+
static Homa::unique_ptr<Transport> create(Driver* driver,
107+
Callbacks* callbacks,
108+
uint64_t transportId);
109+
110+
/**
111+
* Process any timeouts that have expired.
112+
*
113+
* This method must be called periodically to ensure timely handling of
114+
* expired timeouts.
115+
*
116+
* @return
117+
* The rdtsc cycle time when this method should be called again.
118+
*/
119+
virtual uint64_t checkTimeouts() = 0;
120+
121+
/**
122+
* Handle an ingress packet by running it through the transport protocol
123+
* stack.
124+
*
125+
* @param packet
126+
* The ingress packet.
127+
* @param source
128+
* IpAddress of the socket from which the packet is sent.
129+
*/
130+
virtual void processPacket(Driver::Packet* packet, IpAddress source) = 0;
131+
132+
/**
133+
* Attempt to send out packets for any messages with unscheduled/granted
134+
* bytes in a way that limits queue buildup in the NIC.
135+
*
136+
* This method must be called eagerly to allow the Transport to make
137+
* progress toward sending outgoing messages.
138+
*
139+
* @param[out] waitUntil
140+
* The rdtsc cycle time when this method should be called again
141+
* (this allows the NIC to drain its transmit queue). Only set
142+
* when this method returns true.
143+
* @return
144+
* True if more packets are ready to be transmitted when the method
145+
* returns; false, otherwise.
146+
*/
147+
virtual bool trySend(uint64_t* waitUntil) = 0;
148+
149+
/**
150+
* Attempt to grant to incoming messages according to the Homa protocol.
151+
*
152+
* This method must be called eagerly to allow the Transport to make
153+
* progress toward receiving incoming messages.
154+
*
155+
* @return
156+
* True if the method has found some messages to grant; false,
157+
* otherwise.
158+
*/
159+
virtual bool trySendGrants() = 0;
160+
};
161+
162+
} // namespace Homa::Core

0 commit comments

Comments
 (0)