Skip to content

Commit b8298be

Browse files
committed
ipa: rpi: sync: Add an implementation of the camera sync algorithm
In this implementation, the server sends data packets out onto the network every 30 frames or so. Clients listening for this packet will send frame length deltas back to the pipeline handler to match the synchronisation of the server. We use wallclock timestamps, passed to us from the pipeline handler, that have been de-jittered appropriately, meaning that the synchronisation will actually work across networked devices. When the server's advertised "ready time" is reached, both client and server will signal this through metadata back to their respective controlling applications. Signed-off-by: David Plowman <[email protected]> Signed-off-by: Arsen Mikovic <[email protected]> Signed-off-by: Naushir Patuck <[email protected]>
1 parent b26502f commit b8298be

File tree

3 files changed

+398
-0
lines changed

3 files changed

+398
-0
lines changed

src/ipa/rpi/controller/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ rpi_ipa_controller_sources = files([
2323
'rpi/saturation.cpp',
2424
'rpi/sdn.cpp',
2525
'rpi/sharpen.cpp',
26+
'rpi/sync.cpp',
2627
'rpi/tonemap.cpp',
2728
])
2829

src/ipa/rpi/controller/rpi/sync.cpp

Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/* SPDX-License-Identifier: BSD-2-Clause */
2+
/*
3+
* Copyright (C) 2024, Raspberry Pi Ltd
4+
*
5+
* sync.cpp - sync algorithm
6+
*/
7+
#include "sync.h"
8+
9+
#include <chrono>
10+
#include <ctype.h>
11+
#include <fcntl.h>
12+
#include <strings.h>
13+
#include <unistd.h>
14+
15+
#include <libcamera/base/log.h>
16+
17+
#include <arpa/inet.h>
18+
19+
#include "sync_status.h"
20+
21+
using namespace std;
22+
using namespace std::chrono_literals;
23+
using namespace RPiController;
24+
using namespace libcamera;
25+
26+
LOG_DEFINE_CATEGORY(RPiSync)
27+
28+
#define NAME "rpi.sync"
29+
30+
Sync::Sync(Controller *controller)
31+
: SyncAlgorithm(controller), mode_(Mode::Off), socket_(-1), frameDuration_(0s), frameCount_(0)
32+
{
33+
}
34+
35+
Sync::~Sync()
36+
{
37+
if (socket_ >= 0)
38+
close(socket_);
39+
}
40+
41+
char const *Sync::name() const
42+
{
43+
return NAME;
44+
}
45+
46+
/* This reads from json file and intitiaises server and client */
47+
int Sync::read(const libcamera::YamlObject &params)
48+
{
49+
const char *kDefaultGroup = "239.255.255.250";
50+
constexpr unsigned int kDefaultPort = 10000;
51+
constexpr unsigned int kDefaultSyncPeriod = 30;
52+
constexpr unsigned int kDefaultReadyFrame = 100;
53+
constexpr unsigned int kDefaultMinAdjustment = 50;
54+
55+
/* Socket on which to communicate. */
56+
group_ = params["group"].get<std::string>(kDefaultGroup);
57+
port_ = params["port"].get<uint16_t>(kDefaultPort);
58+
/* Send a sync message every this many frames. */
59+
syncPeriod_ = params["sync_period"].get<uint32_t>(kDefaultSyncPeriod);
60+
/* Application will be told we're ready after this many frames. */
61+
readyFrame_ = params["ready_frame"].get<uint32_t>(kDefaultReadyFrame);
62+
/* Don't change client frame length unless the change exceeds this amount (microseconds). */
63+
minAdjustment_ = params["min_adjustment"].get<uint32_t>(kDefaultMinAdjustment);
64+
65+
return 0;
66+
}
67+
68+
void Sync::initialiseSocket()
69+
{
70+
socket_ = socket(AF_INET, SOCK_DGRAM, 0);
71+
if (socket_ < 0) {
72+
LOG(RPiSync, Error) << "Unable to create socket";
73+
return;
74+
}
75+
76+
memset(&addr_, 0, sizeof(addr_));
77+
addr_.sin_family = AF_INET;
78+
addr_.sin_addr.s_addr = mode_ == Mode::Client ? htonl(INADDR_ANY) : inet_addr(group_.c_str());
79+
addr_.sin_port = htons(port_);
80+
81+
if (mode_ == Mode::Client) {
82+
/* Set to non-blocking. */
83+
int flags = fcntl(socket_, F_GETFL, 0);
84+
fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
85+
86+
unsigned int en = 1;
87+
if (setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &en, sizeof(en)) < 0) {
88+
LOG(RPiSync, Error) << "Unable to set socket options";
89+
goto err;
90+
}
91+
92+
struct ip_mreq mreq {};
93+
mreq.imr_multiaddr.s_addr = inet_addr(group_.c_str());
94+
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
95+
if (setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
96+
LOG(RPiSync, Error) << "Unable to set socket options";
97+
goto err;
98+
}
99+
100+
if (bind(socket_, (struct sockaddr *)&addr_, sizeof(addr_)) < 0) {
101+
LOG(RPiSync, Error) << "Unable to bind client socket";
102+
goto err;
103+
}
104+
}
105+
106+
return;
107+
108+
err:
109+
close(socket_);
110+
socket_ = -1;
111+
}
112+
113+
void Sync::switchMode([[maybe_unused]] CameraMode const &cameraMode, [[maybe_unused]] Metadata *metadata)
114+
{
115+
/*
116+
* A mode switch means the camera has stopped, so synchronisation will be lost.
117+
* Reset all the internal state so that we start over.
118+
*/
119+
reset();
120+
}
121+
122+
/*
123+
* Camera sync algorithm.
124+
* Server - there is a single server that sends framerate timing information over the network to any
125+
* clients that are listening. It also signals when it will send a "everything is synchronised, now go"
126+
* message back to the algorithm.
127+
* Client - there may be many clients, either on the same Pi or different ones. They match their
128+
* framerates to the server, and indicate when to "go" at the same instant as the server.
129+
*/
130+
void Sync::process([[maybe_unused]] StatisticsPtr &stats, Metadata *imageMetadata)
131+
{
132+
SyncPayload payload;
133+
SyncParams local{};
134+
SyncStatus status{};
135+
bool timerKnown = true;
136+
137+
if (mode_ == Mode::Off)
138+
return;
139+
140+
if (!frameDuration_) {
141+
LOG(RPiSync, Error) << "Sync frame duration not set!";
142+
return;
143+
}
144+
145+
if (socket_ < 0) {
146+
initialiseSocket();
147+
148+
if (socket_ < 0)
149+
return;
150+
151+
/*
152+
* For the client, flush anything in the socket. It might be stale from a previous sync run,
153+
* or we might get another packet in a frame to two before the adjustment caused by this (old)
154+
* packet, although correct, had taken effect. So this keeps things simpler.
155+
*/
156+
if (mode_ == Mode::Client) {
157+
socklen_t addrlen = sizeof(addr_);
158+
int ret = 0;
159+
while (ret >= 0)
160+
ret = recvfrom(socket_, &payload, sizeof(payload), 0, (struct sockaddr *)&addr_, &addrlen);
161+
}
162+
}
163+
164+
imageMetadata->get("sync.params", local);
165+
166+
/* The wallclock has already been de-jittered for us. */
167+
uint64_t wallClockFrameTimestamp = local.wallClock;
168+
169+
/*
170+
* This is the headline frame duration in microseconds as programmed into the sensor. Strictly,
171+
* the sensor might not quite match the system clock, but this shouldn't matter for the calculations
172+
* we'll do with it, unless it's a very very long way out!
173+
*/
174+
uint32_t frameDuration = frameDuration_.get<std::micro>();
175+
176+
/* Timestamps tell us if we've dropped any frames, but we still want to count them. */
177+
int droppedFrames = 0;
178+
if (frameCount_) {
179+
/*
180+
* Round down here, because frameCount_ gets incremented at the end of the function. Also
181+
* ensure droppedFrames can't go negative. It shouldn't, but things would go badly wrong
182+
* if it did.
183+
*/
184+
wallClockFrameTimestamp = std::max<uint64_t>(wallClockFrameTimestamp, lastWallClockFrameTimestamp_ + frameDuration / 2);
185+
droppedFrames = (wallClockFrameTimestamp - lastWallClockFrameTimestamp_ - frameDuration / 2) / frameDuration;
186+
frameCount_ += droppedFrames;
187+
}
188+
189+
if (mode_ == Mode::Server) {
190+
/*
191+
* Server sends a packet every syncPeriod_ frames, or as soon after as possible (if any
192+
* frames were dropped).
193+
*/
194+
serverFrameCountPeriod_ += droppedFrames;
195+
196+
/*
197+
* The client may want a better idea of the true frame duration. Any error would feed straight
198+
* into the correction term because of how it uses it to get the "nearest" frame.
199+
*/
200+
if (frameCount_ == 0)
201+
frameDurationEstimated_ = frameDuration;
202+
else {
203+
double diff = (wallClockFrameTimestamp - lastWallClockFrameTimestamp_) / (1 + droppedFrames);
204+
int N = std::min(frameCount_, 99U);
205+
frameDurationEstimated_ = frameCount_ == 1 ? diff : (N * frameDurationEstimated_ + diff) / (N + 1);
206+
}
207+
208+
/* Calculate frames remaining, and therefore "time left until ready". */
209+
int framesRemaining = readyFrame_ - frameCount_;
210+
uint64_t wallClockReadyTime = wallClockFrameTimestamp + (int64_t)framesRemaining * frameDurationEstimated_;
211+
212+
if (serverFrameCountPeriod_ >= syncPeriod_) {
213+
serverFrameCountPeriod_ = 0;
214+
215+
payload.frameDuration = frameDurationEstimated_ + .5; /* round to nearest */
216+
payload.wallClockFrameTimestamp = wallClockFrameTimestamp;
217+
payload.wallClockReadyTime = wallClockReadyTime;
218+
219+
LOG(RPiSync, Debug) << "Send packet (frameNumber " << frameCount_ << "):";
220+
LOG(RPiSync, Debug) << " frameDuration " << payload.frameDuration;
221+
LOG(RPiSync, Debug) << " wallClockFrameTimestamp " << wallClockFrameTimestamp
222+
<< " (" << wallClockFrameTimestamp - lastWallClockFrameTimestamp_ << ")";
223+
LOG(RPiSync, Debug) << " wallClockReadyTime " << wallClockReadyTime;
224+
225+
if (sendto(socket_, &payload, sizeof(payload), 0, (const sockaddr *)&addr_, sizeof(addr_)) < 0)
226+
LOG(RPiSync, Error) << "Send error! " << strerror(errno);
227+
}
228+
229+
timerValue_ = static_cast<int64_t>(wallClockReadyTime - wallClockFrameTimestamp);
230+
if (!syncReady_ && wallClockFrameTimestamp + frameDurationEstimated_ / 2 > wallClockReadyTime) {
231+
syncReady_ = true;
232+
LOG(RPiSync, Info) << "*** Sync achieved! Difference " << timerValue_ << "us";
233+
}
234+
235+
serverFrameCountPeriod_ += 1;
236+
237+
} else if (mode_ == Mode::Client) {
238+
uint64_t serverFrameTimestamp = 0;
239+
240+
bool packetReceived = false;
241+
while (true) {
242+
socklen_t addrlen = sizeof(addr_);
243+
int ret = recvfrom(socket_, &payload, sizeof(payload), 0, (struct sockaddr *)&addr_, &addrlen);
244+
245+
if (ret < 0)
246+
break;
247+
packetReceived = (ret > 0);
248+
clientSeenPacket_ = true;
249+
250+
frameDurationEstimated_ = payload.frameDuration;
251+
serverFrameTimestamp = payload.wallClockFrameTimestamp;
252+
serverReadyTime_ = payload.wallClockReadyTime;
253+
}
254+
255+
if (packetReceived) {
256+
uint64_t clientFrameTimestamp = wallClockFrameTimestamp;
257+
int64_t clientServerDelta = clientFrameTimestamp - serverFrameTimestamp;
258+
/* "A few frames ago" may have better matched the server's frame. Calculate when it was. */
259+
int framePeriodErrors = (clientServerDelta + frameDurationEstimated_ / 2) / frameDurationEstimated_;
260+
int64_t clientFrameTimestampNearest = clientFrameTimestamp - framePeriodErrors * frameDurationEstimated_;
261+
/* We must shorten a single client frame by this amount if it exceeds the minimum: */
262+
int32_t correction = clientFrameTimestampNearest - serverFrameTimestamp;
263+
if (std::abs(correction) < minAdjustment_)
264+
correction = 0;
265+
266+
LOG(RPiSync, Debug) << "Received packet (frameNumber " << frameCount_ << "):";
267+
LOG(RPiSync, Debug) << " serverFrameTimestamp " << serverFrameTimestamp;
268+
LOG(RPiSync, Debug) << " serverReadyTime " << serverReadyTime_;
269+
LOG(RPiSync, Debug) << " clientFrameTimestamp " << clientFrameTimestamp;
270+
LOG(RPiSync, Debug) << " clientFrameTimestampNearest " << clientFrameTimestampNearest
271+
<< " (" << framePeriodErrors << ")";
272+
LOG(RPiSync, Debug) << " correction " << correction;
273+
274+
status.frameDurationOffset = correction * 1us;
275+
}
276+
277+
timerValue_ = static_cast<int64_t>(serverReadyTime_ - wallClockFrameTimestamp);
278+
timerKnown = clientSeenPacket_; /* client must receive a packet before the timer value is correct */
279+
if (clientSeenPacket_ && !syncReady_ && wallClockFrameTimestamp + frameDurationEstimated_ / 2 > serverReadyTime_) {
280+
syncReady_ = true;
281+
LOG(RPiSync, Info) << "*** Sync achieved! Difference " << timerValue_ << "us";
282+
}
283+
}
284+
285+
lastWallClockFrameTimestamp_ = wallClockFrameTimestamp;
286+
287+
status.ready = syncReady_;
288+
status.timerValue = timerValue_;
289+
status.timerKnown = timerKnown;
290+
imageMetadata->set("sync.status", status);
291+
frameCount_++;
292+
}
293+
294+
void Sync::reset()
295+
{
296+
/* This resets the state so that the synchronisation procedure will start over. */
297+
syncReady_ = false;
298+
frameCount_ = 0;
299+
timerValue_ = 0;
300+
serverFrameCountPeriod_ = 0;
301+
serverReadyTime_ = 0;
302+
clientSeenPacket_ = false;
303+
}
304+
305+
void Sync::setMode(Mode mode)
306+
{
307+
mode_ = mode;
308+
309+
/* Another "sync session" can be started by turning it off and on again. */
310+
if (mode == Mode::Off)
311+
reset();
312+
}
313+
314+
void Sync::setFrameDuration(libcamera::utils::Duration frameDuration)
315+
{
316+
frameDuration_ = frameDuration;
317+
};
318+
319+
void Sync::setReadyFrame(unsigned int frame)
320+
{
321+
readyFrame_ = frame;
322+
};
323+
324+
/* Register algorithm with the system. */
325+
static Algorithm *create(Controller *controller)
326+
{
327+
return (Algorithm *)new Sync(controller);
328+
}
329+
static RegisterAlgorithm reg(NAME, &create);

0 commit comments

Comments
 (0)