🐛 Fixes of videos and more

This commit is contained in:
2025-08-01 23:21:43 +08:00
parent 855072dfea
commit 71b67fd22d
15 changed files with 836 additions and 40 deletions

View File

@@ -0,0 +1,37 @@
//
// Atomic.swift
// Broadcast Extension
//
// Created by Maksym Shcheglov.
// https://www.onswiftwings.com/posts/atomic-property-wrapper/
//
import Foundation
@propertyWrapper
struct Atomic<Value> {
private var value: Value
private let lock = NSLock()
init(wrappedValue value: Value) {
self.value = value
}
var wrappedValue: Value {
get { load() }
set { store(newValue: newValue) }
}
func load() -> Value {
lock.lock()
defer { lock.unlock() }
return value
}
mutating func store(newValue: Value) {
lock.lock()
defer { lock.unlock() }
value = newValue
}
}

View File

@@ -0,0 +1,29 @@
//
// DarwinNotificationCenter.swift
// Broadcast Extension
//
// Created by Alex-Dan Bumbu on 23/03/2021.
// Copyright © 2021 8x8, Inc. All rights reserved.
//
import Foundation
enum DarwinNotification: String {
case broadcastStarted = "iOS_BroadcastStarted"
case broadcastStopped = "iOS_BroadcastStopped"
}
class DarwinNotificationCenter {
static let shared = DarwinNotificationCenter()
private let notificationCenter: CFNotificationCenter
init() {
notificationCenter = CFNotificationCenterGetDarwinNotifyCenter()
}
func postNotification(_ name: DarwinNotification) {
CFNotificationCenterPostNotification(notificationCenter, CFNotificationName(rawValue: name.rawValue as CFString), nil, nil, true)
}
}

View File

@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>NSExtension</key>
<dict>
<key>NSExtensionPointIdentifier</key>
<string>com.apple.broadcast-services-upload</string>
<key>NSExtensionPrincipalClass</key>
<string>$(PRODUCT_MODULE_NAME).SampleHandler</string>
<key>RPBroadcastProcessMode</key>
<string>RPBroadcastProcessModeSampleBuffer</string>
</dict>
</dict>
</plist>

View File

@@ -0,0 +1,103 @@
//
// SampleHandler.swift
// Broadcast Extension
//
// Created by Alex-Dan Bumbu on 04.06.2021.
//
import ReplayKit
import OSLog
let broadcastLogger = OSLog(subsystem: "dev.solsynth.solian", category: "Broadcast")
private enum Constants {
// the App Group ID value that the app and the broadcast extension targets are setup with. It differs for each app.
static let appGroupIdentifier = "group.solsynth.solian"
}
class SampleHandler: RPBroadcastSampleHandler {
private var clientConnection: SocketConnection?
private var uploader: SampleUploader?
private var frameCount: Int = 0
var socketFilePath: String {
let sharedContainer = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: Constants.appGroupIdentifier)
return sharedContainer?.appendingPathComponent("rtc_SSFD").path ?? ""
}
override init() {
super.init()
if let connection = SocketConnection(filePath: socketFilePath) {
clientConnection = connection
setupConnection()
uploader = SampleUploader(connection: connection)
}
os_log(.debug, log: broadcastLogger, "%{public}s", socketFilePath)
}
override func broadcastStarted(withSetupInfo setupInfo: [String: NSObject]?) {
// User has requested to start the broadcast. Setup info from the UI extension can be supplied but optional.
frameCount = 0
DarwinNotificationCenter.shared.postNotification(.broadcastStarted)
openConnection()
}
override func broadcastPaused() {
// User has requested to pause the broadcast. Samples will stop being delivered.
}
override func broadcastResumed() {
// User has requested to resume the broadcast. Samples delivery will resume.
}
override func broadcastFinished() {
// User has requested to finish the broadcast.
DarwinNotificationCenter.shared.postNotification(.broadcastStopped)
clientConnection?.close()
}
override func processSampleBuffer(_ sampleBuffer: CMSampleBuffer, with sampleBufferType: RPSampleBufferType) {
switch sampleBufferType {
case RPSampleBufferType.video:
uploader?.send(sample: sampleBuffer)
default:
break
}
}
}
private extension SampleHandler {
func setupConnection() {
clientConnection?.didClose = { [weak self] error in
os_log(.debug, log: broadcastLogger, "client connection did close \(String(describing: error))")
if let error = error {
self?.finishBroadcastWithError(error)
} else {
// the displayed failure message is more user friendly when using NSError instead of Error
let JMScreenSharingStopped = 10001
let customError = NSError(domain: RPRecordingErrorDomain, code: JMScreenSharingStopped, userInfo: [NSLocalizedDescriptionKey: "Screen sharing stopped"])
self?.finishBroadcastWithError(customError)
}
}
}
func openConnection() {
let queue = DispatchQueue(label: "broadcast.connectTimer")
let timer = DispatchSource.makeTimerSource(queue: queue)
timer.schedule(deadline: .now(), repeating: .milliseconds(100), leeway: .milliseconds(500))
timer.setEventHandler { [weak self] in
guard self?.clientConnection?.open() == true else {
return
}
timer.cancel()
}
timer.resume()
}
}

View File

@@ -0,0 +1,147 @@
//
// SampleUploader.swift
// Broadcast Extension
//
// Created by Alex-Dan Bumbu on 22/03/2021.
// Copyright © 2021 8x8, Inc. All rights reserved.
//
import Foundation
import ReplayKit
import OSLog
private enum Constants {
static let bufferMaxLength = 10240
}
class SampleUploader {
private static var imageContext = CIContext(options: nil)
@Atomic private var isReady = false
private var connection: SocketConnection
private var dataToSend: Data?
private var byteIndex = 0
private let serialQueue: DispatchQueue
init(connection: SocketConnection) {
self.connection = connection
self.serialQueue = DispatchQueue(label: "org.jitsi.meet.broadcast.sampleUploader")
setupConnection()
}
@discardableResult func send(sample buffer: CMSampleBuffer) -> Bool {
guard isReady else {
return false
}
isReady = false
dataToSend = prepare(sample: buffer)
byteIndex = 0
serialQueue.async { [weak self] in
self?.sendDataChunk()
}
return true
}
}
private extension SampleUploader {
func setupConnection() {
connection.didOpen = { [weak self] in
self?.isReady = true
}
connection.streamHasSpaceAvailable = { [weak self] in
self?.serialQueue.async {
if let success = self?.sendDataChunk() {
self?.isReady = !success
}
}
}
}
@discardableResult func sendDataChunk() -> Bool {
guard let dataToSend = dataToSend else {
return false
}
var bytesLeft = dataToSend.count - byteIndex
var length = bytesLeft > Constants.bufferMaxLength ? Constants.bufferMaxLength : bytesLeft
length = dataToSend[byteIndex..<(byteIndex + length)].withUnsafeBytes {
guard let ptr = $0.bindMemory(to: UInt8.self).baseAddress else {
return 0
}
return connection.writeToStream(buffer: ptr, maxLength: length)
}
if length > 0 {
byteIndex += length
bytesLeft -= length
if bytesLeft == 0 {
self.dataToSend = nil
byteIndex = 0
}
} else {
os_log(.debug, log: broadcastLogger, "writeBufferToStream failure")
}
return true
}
func prepare(sample buffer: CMSampleBuffer) -> Data? {
guard let imageBuffer = CMSampleBufferGetImageBuffer(buffer) else {
os_log(.debug, log: broadcastLogger, "image buffer not available")
return nil
}
CVPixelBufferLockBaseAddress(imageBuffer, .readOnly)
let scaleFactor = 1.0
let width = CVPixelBufferGetWidth(imageBuffer)/Int(scaleFactor)
let height = CVPixelBufferGetHeight(imageBuffer)/Int(scaleFactor)
let orientation = CMGetAttachment(buffer, key: RPVideoSampleOrientationKey as CFString, attachmentModeOut: nil)?.uintValue ?? 0
let scaleTransform = CGAffineTransform(scaleX: CGFloat(1.0/scaleFactor), y: CGFloat(1.0/scaleFactor))
let bufferData = self.jpegData(from: imageBuffer, scale: scaleTransform)
CVPixelBufferUnlockBaseAddress(imageBuffer, .readOnly)
guard let messageData = bufferData else {
os_log(.debug, log: broadcastLogger, "corrupted image buffer")
return nil
}
let httpResponse = CFHTTPMessageCreateResponse(nil, 200, nil, kCFHTTPVersion1_1).takeRetainedValue()
CFHTTPMessageSetHeaderFieldValue(httpResponse, "Content-Length" as CFString, String(messageData.count) as CFString)
CFHTTPMessageSetHeaderFieldValue(httpResponse, "Buffer-Width" as CFString, String(width) as CFString)
CFHTTPMessageSetHeaderFieldValue(httpResponse, "Buffer-Height" as CFString, String(height) as CFString)
CFHTTPMessageSetHeaderFieldValue(httpResponse, "Buffer-Orientation" as CFString, String(orientation) as CFString)
CFHTTPMessageSetBody(httpResponse, messageData as CFData)
let serializedMessage = CFHTTPMessageCopySerializedMessage(httpResponse)?.takeRetainedValue() as Data?
return serializedMessage
}
func jpegData(from buffer: CVPixelBuffer, scale scaleTransform: CGAffineTransform) -> Data? {
let image = CIImage(cvPixelBuffer: buffer).transformed(by: scaleTransform)
guard let colorSpace = image.colorSpace else {
return nil
}
let options: [CIImageRepresentationOption: Float] = [kCGImageDestinationLossyCompressionQuality as CIImageRepresentationOption: 1.0]
return SampleUploader.imageContext.jpegRepresentation(of: image, colorSpace: colorSpace, options: options)
}
}

View File

@@ -0,0 +1,199 @@
//
// SocketConnection.swift
// Broadcast Extension
//
// Created by Alex-Dan Bumbu on 22/03/2021.
// Copyright © 2021 Atlassian Inc. All rights reserved.
//
import Foundation
import OSLog
class SocketConnection: NSObject {
var didOpen: (() -> Void)?
var didClose: ((Error?) -> Void)?
var streamHasSpaceAvailable: (() -> Void)?
private let filePath: String
private var socketHandle: Int32 = -1
private var address: sockaddr_un?
private var inputStream: InputStream?
private var outputStream: OutputStream?
private var networkQueue: DispatchQueue?
private var shouldKeepRunning = false
init?(filePath path: String) {
filePath = path
socketHandle = Darwin.socket(AF_UNIX, SOCK_STREAM, 0)
guard socketHandle != -1 else {
os_log(.debug, log: broadcastLogger, "failure: create socket")
return nil
}
}
func open() -> Bool {
os_log(.debug, log: broadcastLogger, "open socket connection")
guard FileManager.default.fileExists(atPath: filePath) else {
os_log(.debug, log: broadcastLogger, "failure: socket file missing")
return false
}
guard setupAddress() == true else {
return false
}
guard connectSocket() == true else {
return false
}
setupStreams()
inputStream?.open()
outputStream?.open()
return true
}
func close() {
unscheduleStreams()
inputStream?.delegate = nil
outputStream?.delegate = nil
inputStream?.close()
outputStream?.close()
inputStream = nil
outputStream = nil
}
func writeToStream(buffer: UnsafePointer<UInt8>, maxLength length: Int) -> Int {
outputStream?.write(buffer, maxLength: length) ?? 0
}
}
extension SocketConnection: StreamDelegate {
func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
switch eventCode {
case .openCompleted:
os_log(.debug, log: broadcastLogger, "client stream open completed")
if aStream == outputStream {
didOpen?()
}
case .hasBytesAvailable:
if aStream == inputStream {
var buffer: UInt8 = 0
let numberOfBytesRead = inputStream?.read(&buffer, maxLength: 1)
if numberOfBytesRead == 0 && aStream.streamStatus == .atEnd {
os_log(.debug, log: broadcastLogger, "server socket closed")
close()
notifyDidClose(error: nil)
}
}
case .hasSpaceAvailable:
if aStream == outputStream {
streamHasSpaceAvailable?()
}
case .errorOccurred:
os_log(.debug, log: broadcastLogger, "client stream error occured: \(String(describing: aStream.streamError))")
close()
notifyDidClose(error: aStream.streamError)
default:
break
}
}
}
private extension SocketConnection {
func setupAddress() -> Bool {
var addr = sockaddr_un()
guard filePath.count < MemoryLayout.size(ofValue: addr.sun_path) else {
os_log(.debug, log: broadcastLogger, "failure: fd path is too long")
return false
}
_ = withUnsafeMutablePointer(to: &addr.sun_path.0) { ptr in
filePath.withCString {
strncpy(ptr, $0, filePath.count)
}
}
address = addr
return true
}
func connectSocket() -> Bool {
guard var addr = address else {
return false
}
let status = withUnsafePointer(to: &addr) { ptr in
ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) {
Darwin.connect(socketHandle, $0, socklen_t(MemoryLayout<sockaddr_un>.size))
}
}
guard status == noErr else {
os_log(.debug, log: broadcastLogger, "failure: \(status)")
return false
}
return true
}
func setupStreams() {
var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>?
CFStreamCreatePairWithSocket(kCFAllocatorDefault, socketHandle, &readStream, &writeStream)
inputStream = readStream?.takeRetainedValue()
inputStream?.delegate = self
inputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
outputStream = writeStream?.takeRetainedValue()
outputStream?.delegate = self
outputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
scheduleStreams()
}
func scheduleStreams() {
shouldKeepRunning = true
networkQueue = DispatchQueue.global(qos: .userInitiated)
networkQueue?.async { [weak self] in
self?.inputStream?.schedule(in: .current, forMode: .common)
self?.outputStream?.schedule(in: .current, forMode: .common)
RunLoop.current.run()
var isRunning = false
repeat {
isRunning = self?.shouldKeepRunning ?? false && RunLoop.current.run(mode: .default, before: .distantFuture)
} while (isRunning)
}
}
func unscheduleStreams() {
networkQueue?.sync { [weak self] in
self?.inputStream?.remove(from: .current, forMode: .common)
self?.outputStream?.remove(from: .current, forMode: .common)
}
shouldKeepRunning = false
}
func notifyDidClose(error: Error?) {
if didClose != nil {
didClose?(error)
}
}
}

View File

@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>com.apple.security.application-groups</key>
<array>
<string>group.solsynth.solian</string>
</array>
</dict>
</plist>