/** * Called when a stage completes successfully or fails, with information on the completed stage. */ defonStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit ... }
// 事件 @DeveloperApi @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event") traitSparkListenerEvent{ /* Whether output this event to the event log */ protected[spark] deflogEvent: Boolean = true } ... @DeveloperApi caseclassSparkListenerStageCompleted(stageInfo: StageInfo) extendsSparkListenerEvent ...
/** * An asynchronous queue for events. All events posted to this queue will be delivered to the child * listeners in a separate thread. * * Delivery will only begin when the `start()` method is called. The `stop()` method should be * called when no more events need to be delivered. */ privateclassAsyncEventQueue( val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics, bus: LiveListenerBus) extendsSparkListenerBus withLogging { importAsyncEventQueue._
defpost(event: SparkListenerEvent): Unit = { if (stopped.get()) { return }
eventCount.incrementAndGet() if (eventQueue.offer(event)) { return }
eventCount.decrementAndGet() droppedEvents.inc() droppedEventsCounter.incrementAndGet() if (logDroppedEvent.compareAndSet(false, true)) { // Only log the following message once to avoid duplicated annoying logs. logError(s"Dropping event from queue $name. " + "This likely means one of the listeners is too slow and cannot keep up with " + "the rate at which tasks are being started by the scheduler.") } logTrace(s"Dropping event $event")
val droppedEventsCount = droppedEventsCounter.get val droppedCountIncreased = droppedEventsCount - lastDroppedEventsCounter val lastReportTime = lastReportTimestamp.get val curTime = System.currentTimeMillis() // Don't log too frequently if (droppedCountIncreased > 0 && curTime - lastReportTime >= LOGGING_INTERVAL) { // There may be multiple threads trying to logging dropped events, // Use 'compareAndSet' to make sure only one thread can win. if (lastReportTimestamp.compareAndSet(lastReportTime, curTime)) { val previous = new java.util.Date(lastReportTime) lastDroppedEventsCounter = droppedEventsCount logWarning(s"Dropped $droppedCountIncreased events from $name since " + s"${if (lastReportTime == 0) "the application started" else s"$previous"}.") } } }