-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-38783][runtime] Improved TieredStorageResourceRegistry Thread-Safety/Concurrency #27459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…Test Cases to Verify Potential Issue
…rrency Tests to Avoid Redundancy
…uctures to Improve Concurrency Support
…gistry Concurrency Tests
|
CC: @TanYuxin-tyx for review (please feel free to tag someone else). You seemed to be the most appropriate given the history. |
7c8a412 to
7715029
Compare
…rceRegistry Concurrency Tests
7715029 to
448adde
Compare
|
@flinkbot run azure |
| private final Map<TieredStorageDataIdentifier, List<TieredStorageResource>> | ||
| registeredResources = new HashMap<>(); | ||
| private final ConcurrentHashMap< | ||
| TieredStorageDataIdentifier, CopyOnWriteArrayList<TieredStorageResource>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious, why do we need the CopyOnWriteArrayList, is the introduction of ConcurrentHashMap not enough to solve this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question!
I thought the same when I initially applied the fix (e.g., only swapped out the external map for its thread-safe brethren), however realized the tests that were added would still fail.
The ConcurrentHashMap handles the thread-safety for the map operations but not for the internal values within the map. This makes it possible to have multiple separate threads acting upon the non thread-safe list, which can lead to some inconsistency:
registeredResources
.computeIfAbsent(owner, (ignore) -> new ArrayList<>())
// Concurrent callers could be working with the same thread-safe map, but
// the underlying list is not thread-safe
.add(tieredStorageResource);
Without the extra thread-safety on the list, many of the existing tests can fail with ConcurrentModificationException, NullPointerException, and lost entries (which testConcurrentRegisterWithSameIdentifier specifically checks for). Making the swap to the CopyOnWriteArrayList (or some other thread-safe collection like Collections.synchronizedList()) makes the behavior consistent.
What is the purpose of the change
This pull request addresses the issue detailed in FLINK-38783 which detailed how the registration process within
TieredStorageResourceRegistrywas not properly handling concurrent operations and as such could throw aConcurrentModificationExceptionunder concurrent load.Brief change log
registeredResourcesgeneral-purpose hash map (Map<..., List<...>>) with corresponding thread-safe structures (ConcurrentHashMap<..., CopyOnWriteArrayList<...>>) to better handle concurrent operations.Verifying this change
This change added a series of tests in
TieredStorageResourceRegistryTestto originally reproduce the issue and later confirm the fix worked as expected including:testConcurrentRegisterResourceto test concurrent resource registration across separate threads (10 total with same owner/identifier)testConcurrentRegisterResourceWithDifferentOwnersto test concurrent registration across separate threads (10 total with different owners/identifiers)testConcurrentRegisterAndClearto test concurrent registration and clearing across separate threadsExample Tests
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation