// Each job can only have one user name and this name must be consistent across restarts. // We cannot use job id as commit user name here because user may change job id by creating // a savepoint, stop the job and then resume from savepoint. StringcommitUser= StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); // 初始化bucket分配器,因此HashBucketAssigner是入口类 this.assigner = newHashBucketAssigner( table.snapshotManager(), commitUser, table.store().newIndexFileHandler(), getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask(), table.coreOptions().dynamicBucketTargetRowNum()); this.extractor = extractorFunction.apply(table.schema()); }
/** Assign a bucket for key hash of a record. */ publicintassign(BinaryRow partition, int hash) { // hash: Record主键的hashcode,唯一确认一个Record intrecordAssignId= computeAssignId(hash); // 可能是因为,Flink DAG前面已经通过主键的hashcode % channels了,所以一定相等 checkArgument( recordAssignId == assignId, "This is a bug, record assign id %s should equal to assign id %s.", recordAssignId, assignId); // PartitionIndex: Bucket Index Per Partition. // 为每一个partition计算对应的Bucket Index PartitionIndexindex= partitionIndex.computeIfAbsent(partition, this::loadIndex); return index.assign(hash, (bucket) -> computeAssignId(bucket) == assignId); }
// 1. is it a key that has appeared before // 注意:当发生Hash冲突的时候,两个不同的parimary key,会有相同的hashcode // 但是我们无法知道是否发生了冲突,本来需要bucketInformation.put(bucket, number + 1),加1 // 因此会导致设置的dynamic-bucket.target-row-num bucket中的条数不准确。 // 只要hash冲突不严重,无伤大雅 if (hash2Bucket.containsKey(hash)) { return hash2Bucket.get(hash); }
// 2. find bucket from existing buckets for (Integer bucket : bucketInformation.keySet()) { if (bucketFilterFunc.test(bucket)) { // it is my bucket Longnumber= bucketInformation.get(bucket); if (number < targetBucketRowNumber) { bucketInformation.put(bucket, number + 1); hash2Bucket.put(hash, bucket.shortValue()); return bucket; } } }
// 3. create a new bucket for (inti=0; i < Short.MAX_VALUE; i++) { if (bucketFilterFunc.test(i) && !bucketInformation.containsKey(i)) { hash2Bucket.put(hash, (short) i); bucketInformation.put(i, 1L); return i; } }
@SuppressWarnings("OptionalGetWithoutIsPresent") intmaxBucket= bucketInformation.keySet().stream().mapToInt(Integer::intValue).max().getAsInt(); thrownewRuntimeException( String.format( "To more bucket %s, you should increase target bucket row number %s.", maxBucket, targetBucketRowNumber)); }