


Melaksanakan Sistem Pemprosesan Pesanan: Bahagian Aliran Kerja Temporal Lanjutan
1. Pengenalan dan Matlamat
Selamat datang kembali ke siri kami untuk melaksanakan sistem pemprosesan pesanan yang canggih! Dalam catatan kami sebelum ini, kami meletakkan asas untuk projek kami, menyediakan API CRUD asas, menyepadukan dengan pangkalan data Postgres dan melaksanakan aliran kerja Temporal yang mudah. Hari ini, kami menyelam lebih dalam ke dalam dunia aliran kerja Temporal untuk mencipta sistem pemprosesan pesanan yang teguh dan berskala.
Imbasan Catatan Sebelumnya
Dalam Bahagian 1, kami:
- Sediakan struktur projek kami
- Melaksanakan API CRUD asas menggunakan Golang dan Gin
- Bersepadu dengan pangkalan data Postgres
- Mencipta aliran kerja Temporal yang ringkas
- Menyangkut aplikasi kami
Matlamat untuk Siaran Ini
Dalam siaran ini, kami akan mengembangkan penggunaan Temporal kami dengan ketara, meneroka konsep lanjutan dan melaksanakan aliran kerja yang kompleks. Pada penghujung artikel ini, anda akan dapat:
- Reka bentuk dan laksana aliran kerja pemprosesan pesanan berbilang langkah
- Kendalikan proses yang berjalan lama dengan berkesan
- Laksanakan mekanisme pengendalian ralat dan cuba semula yang mantap
- Aliran kerja versi untuk kemas kini selamat dalam pengeluaran
- Laksanakan corak saga untuk transaksi yang diedarkan
- Sediakan pemantauan dan pemerhatian untuk aliran kerja Temporal
Jom selami!
2. Latar Belakang Teori dan Konsep
Sebelum kita memulakan pengekodan, mari semak beberapa konsep Temporal utama yang akan menjadi penting untuk pelaksanaan lanjutan kita.
Aliran Kerja dan Aktiviti Sementara
Dalam Temporal, Aliran Kerja ialah fungsi tahan lama yang mengatur logik perniagaan yang berjalan lama. Aliran kerja adalah tahan terhadap kesalahan dan boleh bertahan dalam proses dan kegagalan mesin. Ia boleh dianggap sebagai mekanisme penyelarasan yang boleh dipercayai untuk peralihan keadaan aplikasi anda.
Aktiviti, sebaliknya, adalah bahan binaan aliran kerja. Mereka mewakili satu tindakan atau tugas yang jelas dan jelas, seperti membuat panggilan API, menulis ke pangkalan data atau menghantar e-mel. Aktiviti boleh dicuba semula secara bebas daripada aliran kerja yang memanggilnya.
Pelaksanaan Aliran Kerja, Sejarah dan Pengurusan Negeri
Apabila aliran kerja dilaksanakan, Temporal mengekalkan sejarah semua peristiwa yang berlaku sepanjang hayatnya. Sejarah ini adalah sumber kebenaran untuk keadaan aliran kerja. Jika pekerja aliran kerja gagal dan dimulakan semula, ia boleh membina semula keadaan aliran kerja dengan memainkan semula sejarah ini.
Pendekatan penyumberan acara ini membolehkan Temporal memberikan jaminan ketekalan yang kukuh dan membolehkan ciri seperti versi aliran kerja dan diteruskan seperti baharu.
Mengendalikan Proses Berlangsung Lama
Temporal direka untuk mengendalikan proses yang boleh berjalan untuk tempoh yang panjang - dari minit ke hari atau bahkan bulan. Ia menyediakan mekanisme seperti degupan jantung untuk aktiviti yang berjalan lama dan berterusan seperti baharu untuk aliran kerja yang menjana sejarah yang besar.
Versi Aliran Kerja
Apabila sistem anda berkembang, anda mungkin perlu mengemas kini definisi aliran kerja. Temporal menyediakan keupayaan versi yang membolehkan anda membuat perubahan tanpa putus pada aliran kerja tanpa menjejaskan kejadian yang sedang berjalan.
Corak Saga untuk Transaksi Teragih
Corak Saga ialah satu cara untuk mengurus ketekalan data merentas perkhidmatan mikro dalam senario transaksi yang diedarkan. Ia amat berguna apabila anda perlu mengekalkan konsistensi merentas pelbagai perkhidmatan tanpa menggunakan transaksi ACID teragih. Temporal menyediakan rangka kerja yang sangat baik untuk melaksanakan saga.
Sekarang kami telah membincangkan konsep ini, mari mula melaksanakan aliran kerja pemprosesan pesanan lanjutan kami.
3. Melaksanakan Aliran Kerja Pemprosesan Pesanan Kompleks
Mari mereka bentuk aliran kerja pemprosesan pesanan berbilang langkah yang merangkumi pengesahan pesanan, pemprosesan pembayaran, pengurusan inventori dan pengaturan penghantaran. Kami akan melaksanakan setiap langkah ini sebagai aktiviti berasingan yang diselaraskan oleh aliran kerja.
Pertama, mari kita tentukan aktiviti kita:
// internal/workflow/activities.go package workflow import ( "context" "errors" "go.temporal.io/sdk/activity" "github.com/yourusername/order-processing-system/internal/db" ) type OrderActivities struct { queries *db.Queries } func NewOrderActivities(queries *db.Queries) *OrderActivities { return &OrderActivities{queries: queries} } func (a *OrderActivities) ValidateOrder(ctx context.Context, order db.Order) error { // Implement order validation logic if order.TotalAmount <= 0 { return errors.New("invalid order amount") } // Add more validation as needed return nil } func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error { // Implement payment processing logic // This could involve calling a payment gateway API activity.GetLogger(ctx).Info("Processing payment", "orderId", order.ID, "amount", order.TotalAmount) // Simulate payment processing // In a real scenario, you'd integrate with a payment gateway here return nil } func (a *OrderActivities) UpdateInventory(ctx context.Context, order db.Order) error { // Implement inventory update logic // This could involve updating stock levels in the database activity.GetLogger(ctx).Info("Updating inventory", "orderId", order.ID) // Simulate inventory update // In a real scenario, you'd update your inventory management system here return nil } func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error { // Implement shipping arrangement logic // This could involve calling a shipping provider's API activity.GetLogger(ctx).Info("Arranging shipping", "orderId", order.ID) // Simulate shipping arrangement // In a real scenario, you'd integrate with a shipping provider here return nil }
Sekarang, mari laksanakan aliran kerja pemprosesan pesanan kami yang kompleks:
// internal/workflow/order_workflow.go package workflow import ( "time" "go.temporal.io/sdk/workflow" "github.com/yourusername/order-processing-system/internal/db" ) func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Activity options activityOptions := workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, MaximumAttempts: 5, }, } ctx = workflow.WithActivityOptions(ctx, activityOptions) // Step 1: Validate Order err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil) if err != nil { logger.Error("Order validation failed", "OrderID", order.ID, "Error", err) return err } // Step 2: Process Payment err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { logger.Error("Payment processing failed", "OrderID", order.ID, "Error", err) return err } // Step 3: Update Inventory err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { logger.Error("Inventory update failed", "OrderID", order.ID, "Error", err) // In case of inventory update failure, we might need to refund the payment // This is where the saga pattern becomes useful, which we'll cover later return err } // Step 4: Arrange Shipping err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil) if err != nil { logger.Error("Shipping arrangement failed", "OrderID", order.ID, "Error", err) // If shipping fails, we might need to revert inventory and refund payment return err } logger.Info("OrderWorkflow completed successfully", "OrderID", order.ID) return nil }
Aliran kerja ini menyelaraskan berbilang aktiviti, setiap satu mewakili satu langkah dalam pemprosesan pesanan kami. Perhatikan cara kami menggunakan aliran kerja.ExecuteActivity untuk menjalankan setiap aktiviti, menghantar data pesanan mengikut keperluan.
Kami juga telah menyediakan pilihan aktiviti dengan dasar cuba semula. Ini bermakna jika aktiviti gagal (cth., disebabkan isu rangkaian sementara), Temporal akan mencuba semula secara automatik berdasarkan dasar kami yang ditentukan.
Dalam bahagian seterusnya, kami akan meneroka cara mengendalikan proses yang berjalan lama dalam struktur aliran kerja ini.
4. Handling Long-Running Processes with Temporal
In real-world scenarios, some of our activities might take a long time to complete. For example, payment processing might need to wait for bank confirmation, or shipping arrangement might depend on external logistics systems. Temporal provides several mechanisms to handle such long-running processes effectively.
Heartbeats for Long-Running Activities
For activities that might run for extended periods, it’s crucial to implement heartbeats. Heartbeats allow an activity to report its progress and let Temporal know that it’s still alive and working. If an activity fails to heartbeat within the expected interval, Temporal can mark it as failed and potentially retry it.
Let’s modify our ArrangeShipping activity to include heartbeats:
func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error { logger := activity.GetLogger(ctx) logger.Info("Arranging shipping", "orderId", order.ID) // Simulate a long-running process for i := 0; i < 10; i++ { // Simulate work time.Sleep(time.Second) // Record heartbeat activity.RecordHeartbeat(ctx, i) // Check if we need to cancel if activity.GetInfo(ctx).Attempt > 1 { logger.Info("Cancelling shipping arrangement due to retry", "orderId", order.ID) return nil } } logger.Info("Shipping arranged", "orderId", order.ID) return nil }
In this example, we’re simulating a long-running process with a loop. We record a heartbeat in each iteration, allowing Temporal to track the activity’s progress.
Using Continue-As-New for Very Long-Running Workflows
For workflows that run for very long periods or accumulate a large history, Temporal provides the “continue-as-new” feature. This allows you to complete the current workflow execution and immediately start a new execution with the same workflow ID, carrying over any necessary state.
Here’s an example of how we might use continue-as-new in a long-running order tracking workflow:
func LongRunningOrderTrackingWorkflow(ctx workflow.Context, orderID string) error { logger := workflow.GetLogger(ctx) // Set up a timer for how long we want this workflow execution to run timerFired := workflow.NewTimer(ctx, 24*time.Hour) // Set up a selector to wait for either the timer to fire or the order to be delivered selector := workflow.NewSelector(ctx) var orderDelivered bool selector.AddFuture(timerFired, func(f workflow.Future) { // Timer fired, we'll continue-as-new logger.Info("24 hours passed, continuing as new", "orderID", orderID) workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID) }) selector.AddReceive(workflow.GetSignalChannel(ctx, "orderDelivered"), func(c workflow.ReceiveChannel, more bool) { c.Receive(ctx, &orderDelivered) logger.Info("Order delivered signal received", "orderID", orderID) }) selector.Select(ctx) if orderDelivered { logger.Info("Order tracking completed, order delivered", "orderID", orderID) return nil } // If we reach here, it means we're continuing as new return workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID) }
In this example, we set up a workflow that tracks an order for delivery. It runs for 24 hours before using continue-as-new to start a fresh execution. This prevents the workflow history from growing too large over extended periods.
By leveraging these techniques, we can handle long-running processes effectively in our order processing system, ensuring reliability and scalability even for operations that take extended periods to complete.
In the next section, we’ll dive into implementing robust retry logic and error handling in our workflows and activities.
5. Implementing Retry Logic and Error Handling
Robust error handling and retry mechanisms are crucial for building resilient systems, especially in distributed environments. Temporal provides powerful built-in retry mechanisms, but it’s important to understand how to use them effectively and when to implement custom retry logic.
Configuring Retry Policies for Activities
Temporal allows you to configure retry policies at both the workflow and activity level. Let’s update our workflow to include a more sophisticated retry policy:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Define a retry policy retryPolicy := &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, MaximumAttempts: 5, NonRetryableErrorTypes: []string{"InvalidOrderError"}, } // Activity options with retry policy activityOptions := workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, RetryPolicy: retryPolicy, } ctx = workflow.WithActivityOptions(ctx, activityOptions) // Execute activities with retry policy err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "ValidateOrder", err, order) } // ... (other activities) return nil }
In this example, we’ve defined a retry policy that starts with a 1-second interval, doubles the interval with each retry (up to a maximum of 1 minute), and allows up to 5 attempts. We’ve also specified that errors of type “InvalidOrderError” should not be retried.
Implementing Custom Retry Logic
While Temporal’s built-in retry mechanisms are powerful, sometimes you need custom retry logic. Here’s an example of implementing custom retry logic for a payment processing activity:
func (a *OrderActivities) ProcessPaymentWithCustomRetry(ctx context.Context, order db.Order) error { logger := activity.GetLogger(ctx) var err error for attempt := 1; attempt <= 3; attempt++ { err = a.processPayment(ctx, order) if err == nil { return nil } if _, ok := err.(*PaymentDeclinedError); ok { // Payment was declined, no point in retrying return err } logger.Info("Payment processing failed, retrying", "attempt", attempt, "error", err) time.Sleep(time.Duration(attempt) * time.Second) } return err } func (a *OrderActivities) processPayment(ctx context.Context, order db.Order) error { // Actual payment processing logic here // ... }
In this example, we implement a custom retry mechanism that attempts the payment processing up to 3 times, with an increasing delay between attempts. It also handles a specific error type (PaymentDeclinedError) differently, not retrying in that case.
Handling and Propagating Errors
Proper error handling is crucial for maintaining the integrity of our workflow. Let’s implement a helper function to handle errors in our workflow:
func handleOrderError(ctx workflow.Context, activityName string, err error, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Error("Activity failed", "activity", activityName, "orderID", order.ID, "error", err) // Depending on the activity and error type, we might want to compensate switch activityName { case "ProcessPayment": // If payment processing failed, we might need to cancel the order _ = workflow.ExecuteActivity(ctx, CancelOrder, order).Get(ctx, nil) case "UpdateInventory": // If inventory update failed after payment, we might need to refund _ = workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil) } // Create a customer-facing error message return workflow.NewCustomError("OrderProcessingFailed", "Failed to process order due to: "+err.Error()) }
This helper function logs the error, performs any necessary compensating actions, and returns a custom error that can be safely returned to the customer.
6. Versioning Workflows for Safe Updates
As your system evolves, you’ll need to update your workflow definitions. Temporal provides versioning capabilities that allow you to make changes to workflows without affecting running instances.
Implementing Versioned Workflows
Here’s an example of how to implement versioning in our order processing workflow:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Use GetVersion to handle workflow versioning v := workflow.GetVersion(ctx, "OrderWorkflow.PaymentProcessing", workflow.DefaultVersion, 1) if v == workflow.DefaultVersion { // Old version: process payment before updating inventory err := workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "ProcessPayment", err, order) } err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "UpdateInventory", err, order) } } else { // New version: update inventory before processing payment err := workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "UpdateInventory", err, order) } err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "ProcessPayment", err, order) } } // ... rest of the workflow return nil }
In this example, we’ve used workflow.GetVersion to introduce a change in the order of operations. The new version updates inventory before processing payment, while the old version does the opposite. This allows us to gradually roll out the change without affecting running workflow instances.
Strategies for Updating Workflows in Production
When updating workflows in a production environment, consider the following strategies:
Incremental Changes : Make small, incremental changes rather than large overhauls. This makes it easier to manage versions and roll back if needed.
Compatibility Periods : Maintain compatibility with older versions for a certain period to allow running workflows to complete.
Feature Flags : Use feature flags in conjunction with workflow versions to control the rollout of new features.
Monitoring and Alerting : Set up monitoring and alerting for workflow versions to track the progress of updates and quickly identify any issues.
Rollback Plan : Always have a plan to roll back to the previous version if issues are detected with the new version.
By following these strategies and leveraging Temporal’s versioning capabilities, you can safely evolve your workflows over time without disrupting ongoing operations.
In the next section, we’ll explore how to implement the Saga pattern for managing distributed transactions in our order processing system.
7. Implementing Saga Patterns for Distributed Transactions
The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful in our order processing system where we need to coordinate actions across multiple services (e.g., inventory, payment, shipping) and provide a mechanism for compensating actions if any step fails.
Designing a Saga for Our Order Processing System
Let’s design a saga for our order processing system that includes the following steps:
- Reserve Inventory
- Process Payment
- Update Inventory
- Arrange Shipping
If any of these steps fail, we need to execute compensating actions for the steps that have already completed.
Here’s how we can implement this saga using Temporal:
func OrderSaga(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderSaga started", "OrderID", order.ID) // Saga compensations var compensations []func(context.Context) error // Step 1: Reserve Inventory err := workflow.ExecuteActivity(ctx, a.ReserveInventory, order).Get(ctx, nil) if err != nil { return fmt.Errorf("failed to reserve inventory: %w", err) } compensations = append(compensations, func(ctx context.Context) error { return a.ReleaseInventoryReservation(ctx, order) }) // Step 2: Process Payment err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { return compensate(ctx, compensations, fmt.Errorf("failed to process payment: %w", err)) } compensations = append(compensations, func(ctx context.Context) error { return a.RefundPayment(ctx, order) }) // Step 3: Update Inventory err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { return compensate(ctx, compensations, fmt.Errorf("failed to update inventory: %w", err)) } // No compensation needed for this step, as we've already updated the inventory // Step 4: Arrange Shipping err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil) if err != nil { return compensate(ctx, compensations, fmt.Errorf("failed to arrange shipping: %w", err)) } logger.Info("OrderSaga completed successfully", "OrderID", order.ID) return nil } func compensate(ctx workflow.Context, compensations []func(context.Context) error, err error) error { logger := workflow.GetLogger(ctx) logger.Error("Saga failed, executing compensations", "error", err) for i := len(compensations) - 1; i >= 0; i-- { compensationErr := workflow.ExecuteActivity(ctx, compensations[i]).Get(ctx, nil) if compensationErr != nil { logger.Error("Compensation failed", "error", compensationErr) // In a real-world scenario, you might want to implement more sophisticated // error handling for failed compensations, such as retrying or alerting } } return err }
In this implementation, we execute each step of the order process as an activity. After each successful step, we add a compensating action to a slice. If any step fails, we call the compensate function, which executes all the compensating actions in reverse order.
This approach ensures that we maintain data consistency across our distributed system, even in the face of failures.
8. Monitoring and Observability for Temporal Workflows
Effective monitoring and observability are crucial for operating Temporal workflows in production. Let’s explore how to implement comprehensive monitoring for our order processing system.
Implementing Custom Metrics
Temporal provides built-in metrics, but we can also implement custom metrics for our specific use cases. Here’s an example of how to add custom metrics to our workflow:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Define metric orderProcessingTime := workflow.NewTimer(ctx, 0) defer func() { duration := orderProcessingTime.ElapsedTime() workflow.GetMetricsHandler(ctx).Timer("order_processing_time").Record(duration) }() // ... rest of the workflow implementation return nil }
In this example, we’re recording the total time taken to process an order.
Integrating with Prometheus
To integrate with Prometheus, we need to expose our metrics. Here’s how we can set up a Prometheus endpoint in our main application:
package main import ( "net/http" "github.com/prometheus/client_golang/prometheus/promhttp" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) func main() { // ... Temporal client setup // Create a worker w := worker.New(c, "order-processing-task-queue", worker.Options{}) // Register workflows and activities w.RegisterWorkflow(OrderWorkflow) w.RegisterActivity(a.ValidateOrder) // ... register other activities // Start the worker go func() { err := w.Run(worker.InterruptCh()) if err != nil { logger.Fatal("Unable to start worker", err) } }() // Expose Prometheus metrics http.Handle("/metrics", promhttp.Handler()) go func() { err := http.ListenAndServe(":2112", nil) if err != nil { logger.Fatal("Unable to start metrics server", err) } }() // ... rest of your application }
This sets up a /metrics endpoint that Prometheus can scrape to collect our custom metrics along with the built-in Temporal metrics.
Implementing Structured Logging
Structured logging can greatly improve the observability of our system. Let’s update our workflow to use structured logging:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID, "CustomerID", order.CustomerID, "TotalAmount", order.TotalAmount, ) // ... workflow implementation logger.Info("OrderWorkflow completed", "OrderID", order.ID, "Duration", workflow.Now(ctx).Sub(workflow.GetInfo(ctx).WorkflowStartTime), ) return nil }
This approach makes it easier to search and analyze logs, especially when aggregating logs from multiple services.
Setting Up Distributed Tracing
Distributed tracing can provide valuable insights into the flow of requests through our system. While Temporal doesn’t natively support distributed tracing, we can implement it in our activities:
import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" ) func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error { _, span := otel.Tracer("order-processing").Start(ctx, "ProcessPayment") defer span.End() span.SetAttributes( attribute.Int64("order.id", order.ID), attribute.Float64("order.amount", order.TotalAmount), ) // ... payment processing logic return nil }
By implementing distributed tracing, we can track the entire lifecycle of an order across multiple services and activities.
9. Testing and Validation
Thorough testing is crucial for ensuring the reliability of our Temporal workflows. Let’s explore some strategies for testing our order processing system.
Unit Testing Workflows
Temporal provides a testing framework that allows us to unit test workflows. Here’s an example of how to test our OrderWorkflow:
func TestOrderWorkflow(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() // Mock activities env.OnActivity(a.ValidateOrder, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.UpdateInventory, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.ArrangeShipping, mock.Anything, mock.Anything).Return(nil) // Execute workflow env.ExecuteWorkflow(OrderWorkflow, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99}) require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) }
This test sets up a test environment, mocks the activities, and verifies that the workflow completes successfully.
Testing Saga Compensations
It’s important to test that our saga compensations work correctly. Here’s an example test:
func TestOrderSagaCompensation(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() // Mock activities env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(errors.New("payment failed")) env.OnActivity(a.ReleaseInventoryReservation, mock.Anything, mock.Anything).Return(nil) // Execute workflow env.ExecuteWorkflow(OrderSaga, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99}) require.True(t, env.IsWorkflowCompleted()) require.Error(t, env.GetWorkflowError()) // Verify that compensation was called env.AssertExpectations(t) }
This test verifies that when the payment processing fails, the inventory reservation is released as part of the compensation.
10. Défis et considérations
Lorsque nous mettons en œuvre et exploitons notre système avancé de traitement des commandes, il y a plusieurs défis et considérations à garder à l'esprit :
Complexité des flux de travail : À mesure que les flux de travail deviennent plus complexes, ils peuvent devenir difficiles à comprendre et à maintenir. Une refactorisation régulière et une bonne documentation sont cruciales.
Test des workflows de longue durée : tester des workflows qui peuvent s'exécuter pendant des jours ou des semaines peut être difficile. Pensez à mettre en place des mécanismes pour accélérer le temps de vos tests.
Gestion des dépendances externes : Les services externes peuvent échouer ou devenir indisponibles. Mettez en œuvre des disjoncteurs et des mécanismes de secours pour gérer ces scénarios.
Surveillance et alertes : configurez une surveillance et des alertes complètes pour identifier et répondre rapidement aux problèmes dans vos flux de travail.
Cohérence des données : assurez-vous que vos implémentations de saga maintiennent la cohérence des données entre les services, même en cas de pannes.
Réglage des performances : à mesure que votre système évolue, vous devrez peut-être ajuster les paramètres de performances de Temporal, tels que le nombre de travailleurs de flux de travail et d'activité.
Gestion des versions du workflow : gérez soigneusement les versions du workflow pour garantir des mises à jour fluides sans interrompre les instances en cours d'exécution.
11. Prochaines étapes et aperçu de la partie 3
Dans cet article, nous avons approfondi les concepts avancés de flux de travail temporel, en mettant en œuvre une logique de traitement de commande complexe, des modèles de saga et une gestion robuste des erreurs. Nous avons également abordé les stratégies de surveillance, d'observabilité et de test pour nos flux de travail.
Dans la prochaine partie de notre série, nous nous concentrerons sur les opérations avancées de base de données avec sqlc. Nous couvrirons :
- Mise en œuvre de requêtes et de transactions de bases de données complexes
- Optimisation des performances de la base de données
- Mise en œuvre des opérations par lots
- Gestion des migrations de bases de données dans un environnement de production
- Mise en œuvre du partitionnement de base de données pour l'évolutivité
- Assurer la cohérence des données dans un système distribué
Restez à l'écoute pendant que nous continuons à développer notre système sophistiqué de traitement des commandes !
Besoin d'aide ?
Êtes-vous confronté à des problèmes difficiles ou avez-vous besoin d'un point de vue externe sur une nouvelle idée ou un nouveau projet ? Je peux aider ! Que vous cherchiez à établir une preuve de concept technologique avant de réaliser un investissement plus important ou que vous ayez besoin de conseils sur des problèmes difficiles, je suis là pour vous aider.
Services offerts :
- Résolution de problèmes : S'attaquer à des problèmes complexes avec des solutions innovantes.
- Consultation : Apporter des conseils d'experts et des points de vue neufs sur vos projets.
- Preuve de concept : Développer des modèles préliminaires pour tester et valider vos idées.
Si vous souhaitez travailler avec moi, veuillez nous contacter par e-mail à hungaikevin@gmail.com.
Transformons vos défis en opportunités !
Atas ialah kandungan terperinci Melaksanakan Sistem Pemprosesan Pesanan: Bahagian Aliran Kerja Temporal Lanjutan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undress AI Tool
Gambar buka pakaian secara percuma

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Clothoff.io
Penyingkiran pakaian AI

Video Face Swap
Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Ia tidak sukar untuk membina pelayan web yang ditulis dalam Go. Inti terletak pada menggunakan pakej NET/HTTP untuk melaksanakan perkhidmatan asas. 1. Gunakan NET/HTTP untuk memulakan pelayan yang paling mudah: fungsi pemprosesan mendaftar dan mendengar port melalui beberapa baris kod; 2. 3. Amalan Umum: Routing Kumpulan oleh Modul Fungsional, dan gunakan perpustakaan pihak ketiga untuk menyokong padanan kompleks; 4. Perkhidmatan Fail Statik: Sediakan fail HTML, CSS dan JS melalui http.fileserver; 5. Prestasi dan Keselamatan: Aktifkan HTTPS, hadkan saiz badan permintaan, dan tetapkan masa tamat untuk meningkatkan keselamatan dan prestasi. Selepas menguasai perkara -perkara utama ini, lebih mudah untuk mengembangkan fungsi.

Inti pemprosesan audio dan video terletak pada pemahaman proses asas dan kaedah pengoptimuman. 1. Proses asas termasuk pengambilalihan, pengekodan, penghantaran, penyahkodan dan main balik, dan setiap pautan mempunyai kesukaran teknikal; 2. Masalah biasa seperti penyimpangan audio dan video, kelewatan lag, bunyi bunyi, gambar kabur, dan lain -lain boleh diselesaikan melalui pelarasan segerak, pengoptimuman pengekodan, modul pengurangan hingar, pelarasan parameter, dan sebagainya; 3. Adalah disyorkan untuk menggunakan FFMPEG, OpenCV, WebRTC, GSTREAMER dan alat lain untuk mencapai fungsi; 4. Dari segi pengurusan prestasi, kita harus memberi perhatian kepada pecutan perkakasan, penetapan kadar bingkai resolusi yang munasabah, masalah konvensyen dan masalah kebocoran memori. Menguasai perkara utama ini akan membantu meningkatkan kecekapan pembangunan dan pengalaman pengguna.

Tujuan Select Plus Default adalah untuk membolehkan Pilih untuk melakukan tingkah laku lalai apabila tiada cawangan lain yang bersedia untuk mengelakkan penyekatan program. 1. Apabila menerima data dari saluran tanpa menyekat, jika saluran kosong, ia akan terus memasuki cawangan lalai; 2. Dalam kombinasi dengan masa. Selepas atau ticker, cuba hantar data dengan kerap. Jika saluran penuh, ia tidak akan menyekat dan melangkau; 3. Mencegah kebuntuan, elakkan program terperangkap apabila tidak pasti sama ada saluran ditutup; Apabila menggunakannya, sila ambil perhatian bahawa cawangan lalai akan dilaksanakan dengan serta -merta dan tidak boleh disalahgunakan, dan lalai dan kes saling eksklusif dan tidak akan dilaksanakan pada masa yang sama.

Cara yang paling berkesan untuk menulis Kubernetesoperator adalah untuk menggunakan Go untuk menggabungkan Kubebuilder dan pengawal-runtime. 1. Memahami corak pengendali: Tentukan sumber tersuai melalui CRD, tulis pengawal untuk mendengar perubahan sumber dan lakukan gelung perdamaian untuk mengekalkan keadaan yang diharapkan. 2. Gunakan Kubebuilder untuk memulakan projek dan membuat API untuk menghasilkan CRD, pengawal dan konfigurasi secara automatik. 3. Tentukan spec dan struktur status CRD dalam API/V1/MYAPP_TYPES.GO, dan menjalankan makeManifests untuk menjana cRDYAML. 4. Daftar masuk dalam pengawal

Bagaimana dengan cepat melaksanakan contoh Restapi yang ditulis dalam GO? Jawapannya adalah menggunakan perpustakaan standard Net/HTTP, yang boleh diselesaikan mengikut tiga langkah berikut: 1. Sediakan struktur projek dan memulakan modul; 2. Tentukan struktur data dan fungsi pemprosesan, termasuk mendapatkan semua data, mendapatkan data tunggal berdasarkan ID, dan membuat data baru; 3. Daftar laluan dalam fungsi utama dan mulakan pelayan. Seluruh proses tidak memerlukan perpustakaan pihak ketiga. Fungsi Restapi Asas dapat direalisasikan melalui perpustakaan standard dan boleh diuji melalui penyemak imbas atau pos.

Gunakan perintah terbina dalam getest untuk menjana data liputan: Run Getest-Cover./... untuk memaparkan peratusan liputan setiap pakej, atau gunakan getest-coverprofile = liputan.out. Mengintegrasikan Laporan Liputan di CI: Buat fail liputan.out, dan muat naik analisis melalui alat pihak ketiga seperti codecov atau coveralls, sebagai contoh, menggunakan curl-data-binary@coverage.o

Langkah-langkah pemasangan Golangci-lint adalah: 1. Pasang menggunakan pemasangan binari atau perintah goinstall; 2. Sahkan sama ada pemasangan berjaya; Kaedah konfigurasi termasuk: 3. Buat fail .golangci.yml untuk membolehkan/melumpuhkan linter, menetapkan laluan pengecualian, dan sebagainya; Kaedah integrasi adalah: 4. Tambah langkah -langkah lint dalam CI/CD (seperti githubactions) untuk memastikan bahawa pemeriksaan LINT secara automatik dijalankan untuk setiap penyerahan dan PR.

Untuk mengurangkan peruntukan timbunan fungsi laluan kritikal di GO, empat kaedah boleh diambil: 1. Gunakan pembolehubah timbunan untuk mengelakkan melarikan diri; 2. Pra-memperuntukkan dan menggunakan semula objek; 3. Elakkan penulisan melarikan diri tersirat; 4. Gunakan alat untuk mengesahkan melarikan diri. Khususnya, ia termasuk mengelakkan penunjuk pembolehubah tempatan yang kembali, menggunakan jenis nilai untuk mengurangkan pelarian, pra-memperuntukkan kapasiti kepingan, menggunakan sync.pool ke objek cache, mengelakkan penutupan untuk menangkap struktur besar, tidak menetapkan jenis antara muka, dan memeriksa titik melarikan diri melalui -gcflags = -m, dengan itu mengurangkan tekanan GC dan meningkatkan prestasi.
