raftexample ialah contoh yang disediakan oleh etcd yang menunjukkan penggunaan perpustakaan algoritma konsensus rakit etcd. raftexample akhirnya melaksanakan perkhidmatan storan nilai kunci teragih yang menyediakan API REST.
Artikel ini akan membaca dan menganalisis kod raftexample, dengan harapan dapat membantu pembaca lebih memahami cara menggunakan perpustakaan rakit etcd dan logik pelaksanaan perpustakaan rakit.
Seni bina raftexample sangat mudah, dengan fail utama seperti berikut:
Permintaan tulis tiba dalam kaedah ServeHTTP modul httpapi melalui permintaan HTTP PUT.
curl -L http://127.0.0.1:12380/key -XPUT -d value
Selepas memadankan kaedah permintaan HTTP melalui suis, ia memasuki aliran pemprosesan kaedah PUT:
Cadangan diserahkan kepada perpustakaan algoritma rakit melalui kaedah Cadangan yang disediakan oleh perpustakaan algoritma rakit.
Kandungan cadangan boleh menambah pasangan nilai kunci baharu, mengemas kini pasangan nilai kunci sedia ada, dsb.
// httpapi.go v, err := io.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on PUT (%v)\n", err) http.Error(w, "Failed on PUT", http.StatusBadRequest) return } h.store.Propose(key, string(v)) w.WriteHeader(http.StatusNoContent)
Seterusnya, mari kita lihat kaedah Cadangan modul kvstore untuk melihat cara cadangan dibina dan diproses.
Dalam kaedah Propose, kami mula-mula mengekod pasangan nilai kunci untuk ditulis menggunakan gob, dan kemudian menghantar kandungan yang dikodkan kepada proposeC, saluran yang bertanggungjawab untuk menghantar cadangan yang dibina oleh modul kvstore ke modul rakit.
// kvstore.go func (s *kvstore) Propose(k string, v string) { var buf strings.Builder if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil { log.Fatal(err) } s.proposeC <- buf.String() }
Cadangan yang dibina oleh kvstore dan diserahkan kepada proposeC diterima dan diproses dengan kaedah serveChannels dalam modul rakit.
Selepas mengesahkan bahawa proposeC belum ditutup, modul rakit menyerahkan cadangan kepada perpustakaan algoritma rakit untuk diproses menggunakan kaedah Cadangan yang disediakan oleh perpustakaan algoritma rakit.
// raft.go select { case prop, ok := <-rc.proposeC: if !ok { rc.proposeC = nil } else { rc.node.Propose(context.TODO(), []byte(prop)) }
Selepas cadangan dikemukakan, ia mengikut proses algoritma rakit. Cadangan akhirnya akan dimajukan ke nod ketua (jika nod semasa bukan ketua dan anda membenarkan pengikut memajukan cadangan, dikawal oleh konfigurasi DisableProposalForwarding). Pemimpin akan menambah cadangan sebagai entri log pada log rakitnya dan menyegerakkannya dengan nod pengikut lain. Selepas dianggap komited, ia akan digunakan pada mesin keadaan dan hasilnya akan dikembalikan kepada pengguna.
Walau bagaimanapun, memandangkan perpustakaan rakit etcd itu sendiri tidak mengendalikan komunikasi antara nod, menambahkan pada log rakit, memohon pada mesin keadaan, dsb., perpustakaan rakit hanya menyediakan data yang diperlukan untuk operasi ini. Operasi sebenar mesti dilakukan oleh kami.
Oleh itu, kami perlu menerima data ini daripada perpustakaan rakit dan memprosesnya dengan sewajarnya berdasarkan jenisnya. Kaedah Sedia mengembalikan saluran baca sahaja yang melaluinya kami boleh menerima data yang perlu diproses.
Perlu diambil perhatian bahawa data yang diterima termasuk berbilang medan, seperti syot kilat untuk digunakan, entri log untuk dilampirkan pada log rakit, mesej untuk dihantar melalui rangkaian, dll.
Meneruskan contoh permintaan tulis kami (nod pemimpin), selepas menerima data yang sepadan, kami perlu menyimpan syot kilat, HardState dan Entri secara berterusan untuk menangani isu yang disebabkan oleh ranap pelayan (cth., pengikut mengundi untuk berbilang calon). HardState dan Entri bersama-sama terdiri daripada keadaan Persistent pada semua pelayan seperti yang dinyatakan dalam kertas. Selepas menyimpannya secara berterusan, kami boleh menggunakan syot kilat dan menambah pada log rakit.
Since we are currently the leader node, the raft library will return MsgApp type messages to us (corresponding to AppendEntries RPC in the paper). We need to send these messages to the follower nodes. Here, we use the rafthttp provided by etcd for node communication and send the messages to follower nodes using the Send method.
// raft.go case rd := <-rc.node.Ready(): if !raft.IsEmptySnap(rd.Snapshot) { rc.saveSnap(rd.Snapshot) } rc.wal.Save(rd.HardState, rd.Entries) if !raft.IsEmptySnap(rd.Snapshot) { rc.raftStorage.ApplySnapshot(rd.Snapshot) rc.publishSnapshot(rd.Snapshot) } rc.raftStorage.Append(rd.Entries) rc.transport.Send(rc.processMessages(rd.Messages)) applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)) if !ok { rc.stop() return } rc.maybeTriggerSnapshot(applyDoneC) rc.node.Advance()
Next, we use the publishEntries method to apply the committed raft log entries to the state machine. As mentioned earlier, in raftexample, the kvstore module acts as the state machine. In the publishEntries method, we pass the log entries that need to be applied to the state machine to commitC. Similar to the earlier proposeC, commitC is responsible for transmitting the log entries that the raft module has deemed committed to the kvstore module for application to the state machine.
// raft.go rc.commitC <- &commit{data, applyDoneC}
In the readCommits method of the kvstore module, messages read from commitC are gob-decoded to retrieve the original key-value pairs, which are then stored in a map structure within the kvstore module.
// kvstore.go for commit := range commitC { ... for _, data := range commit.data { var dataKv kv dec := gob.NewDecoder(bytes.NewBufferString(data)) if err := dec.Decode(&dataKv); err != nil { log.Fatalf("raftexample: could not decode message (%v)", err) } s.mu.Lock() s.kvStore[dataKv.Key] = dataKv.Val s.mu.Unlock() } close(commit.applyDoneC) }
Returning to the raft module, we use the Advance method to notify the raft library that we have finished processing the data read from the Ready channel and are ready to process the next batch of data.
Earlier, on the leader node, we sent MsgApp type messages to the follower nodes using the Send method. The follower node's rafthttp listens on the corresponding port to receive requests and return responses. Whether it's a request received by a follower node or a response received by a leader node, it will be submitted to the raft library for processing through the Step method.
raftNode implements the Raft interface in rafthttp, and the Process method of the Raft interface is called to handle the received request content (such as MsgApp messages).
// raft.go func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { return rc.node.Step(ctx, m) }
The above describes the complete processing flow of a write request in raftexample.
This concludes the content of this article. By outlining the structure of raftexample and detailing the processing flow of a write request, I hope to help you better understand how to use the etcd raft library to build your own distributed KV storage service.
If there are any mistakes or issues, please feel free to comment or message me directly. Thank you.
https://github.com/etcd-io/etcd/tree/main/contrib/raftexample
https://github.com/etcd-io/raft
https://raft.github.io/raft.pdf
Atas ialah kandungan terperinci Cara Membina Sistem Storan KV Teragih Anda Sendiri Menggunakan Perpustakaan Rakit etcd. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!