Multithrreaded Datenbankzugriff in C# und SQL2005 Server mit TransactionScope (Bug oder Fehler meinerseits?)

29/04/2008 - 10:06 von Michael Schöller | Report spam
Hallo Leute,
Ich habe dies zwar schon in einer anderen Newsgroup gepoostet wurde aber
darauf verwiesend as das hier besser passt ^^. also...
Ich hab hier ein verzwicktes Problem. Eigentlich will ich nur mehr wissen
warum sich .NET3.0 und vermutlich auch alle anderen .NET Versionen so
verhalten und ob das ev. ein Bug sein könnte.
Gleich vorweg der MSDTC ist konfiguriert und funktioniert.

Auf einem SQL2005-Server liegt eine Tabelle Test (bigint, varchar(20)). Die
ist befüllt mit den Werten
1, 'Test1'
2, 'Test2'
3, 'Test3'
4, 'Test4'
5, 'Test5'
6, 'Test6'

Führt man nachfolgenden Code aus Funktioniert das beim 1 vom 20 Mal
einwandfrei. Löscht man die 2 eingefügten Zeilen und führt den Code noch
einmal aus bekommt man auf jeden Fall diese Exception:
"System.Transactions.TransactionAbortedException: Die Transaktion wurde
abgebrochen. > System.Transactions.TransactionPromotionException: Fehler
beim Versuch, die Transaktion heraufzustufen. >
System.Data.SqlClient.SqlException: Diesem Befehl ist bereits ein geöffneter
DataReader zugeordnet, der zuerst geschlossen werden muss. bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransactionYukon(TransactionRequest
transactionRequest, String transactionName, IsolationLevel iso,
SqlInternalTransaction internalTransaction, Boolean
isDelegateControlRequest) bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransaction(TransactionRequest
transactionRequest, String name, IsolationLevel iso, SqlInternalTransaction
internalTransaction, Boolean isDelegateControlRequest) bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote() Ende der
internen Ausnahmestapelüberwachung bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote() bei
System.Transactions.Tr

ansactionStatePSPEOperation.PSPEPromote(InternalTransaction tx) bei
System.Transactions.TransactionStateDelegatedBase.EnterState(InternalTransaction
tx) Ende der internen Ausnahmestapelüberwachung bei
System.Transactions.TransactionStateAborted.CheckForFinishedTransaction(InternalTransaction
tx) bei
System.Transactions.TransactionStatePhase0.Promote(InternalTransaction
tx) bei System.Transactions.Transaction.Promote() bei
System.Transactions.TransactionInterop.ConvertToOletxTransaction(Transaction
transaction) bei
System.Transactions.TransactionInterop.GetExportCookie(Transaction
transaction, Byte[] whereabouts) bei
System.Data.SqlClient.SqlInternalConnection.EnlistNonNull(Transaction
tx) bei System.Data.SqlClient.SqlInternalConnection.Enlist(Transaction
tx) bei
System.Data.SqlClient.SqlInternalConnectionTds.Activate(Transaction
transaction) bei
System.Data.ProviderBase.DbConnectionInternal.ActivateConnection(Transaction
transaction)

bei System.Data.ProviderBase.DbConnectionPool.GetConnection(DbConnection
owningObject) bei
System.Data.ProviderBase.DbConnectionFactory.GetConnection(DbConnection
owningConnection) bei
System.Data.ProviderBase.DbConnectionClosed.OpenConnection(DbConnection
outerConnection, DbConnectionFactory connectionFactory) bei
System.Data.SqlClient.SqlConnection.Open() bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
Studio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
120. bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state) bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state) bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

nur kann ich das mit dem Datareader nicht recht glauben... hier der Code.

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static void Main(string[] args)

{

try

{

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));


//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



Alternativ habe ich auch diesen Code hier versucht der eher dem enspricht
was wir Momentan verwenden. Allerdings liefert der wiederum folgende
Exception (fast immer oder eben die obere)

"System.Data.SqlClient.SqlException: Distributed transaction completed.
Either enlist this session in a new transaction or the NULL transaction.
bei System.Data.SqlClient.SqlConnection.OnError(SqlException exception,
Boolean breakConnection) bei
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception,
Boolean breakConnection) bei
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject
stateObj) bei System.Data.SqlClient.TdsParser.Run(RunBehavior
runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream,
BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)
bei System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String
methodName, Boolean async) bei
System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult
result, String methodName, Boolean sendToPipe) bei
System.Data.SqlClient.SqlCommand.ExecuteNonQuery() bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
St

udio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
133. bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state) bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state) bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

Hier der Code

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static SqlConnection conn1;

static SqlConnection conn2;

static void Main(string[] args)

{

try

{

conn1 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn2 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn1.Open();

conn2.Open();

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

conn1.EnlistTransaction(Transaction.Current);

conn2.EnlistTransaction(Transaction.Current);

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));


//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

conn1.Close();

conn2.Close();

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn1;

{

//conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn2;

{

//conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



Für mich ergibt das aber recht wenig Sinn. Es funktioniert rein gar nichts
so wie ich mir das vorstellen würde.

Gibt es überhaupt eine vernünftige möglichkeit Multithreaded mit Datenbanken
zu arbeiten oder sollte man das lieber ganz lassen?

Wenn ja würde ich um ein Funktionierendes Beispiel bitten. Mir gehen die
Gründe aus warum das nicht geht. Vor allem sehe ich nicht ein warum ich
nicht Multithreadet auf die Datenbank losgehen können soll.



Mit der Bitte um Hilfe

Michael Schöller
 

Lesen sie die antworten

#1 Elmar Boye
30/04/2008 - 12:55 | Warnen spam
Hallo Michael,

Michael Schöller schrieb:
Ich habe dies zwar schon in einer anderen Newsgroup gepoostet wurde aber
darauf verwiesend as das hier besser passt ^^. also...
Ich hab hier ein verzwicktes Problem. Eigentlich will ich nur mehr wissen
warum sich .NET3.0 und vermutlich auch alle anderen .NET Versionen so
verhalten und ob das ev. ein Bug sein könnte.



Vorneweg: Das ist kein Bug.

Führt man nachfolgenden Code aus Funktioniert das beim 1 vom 20 Mal
einwandfrei.



Ein bissel Glück hast Du da schon gehabt, was an den großzügigen
Schlafeinheiten (Sleep) liegen mag.
Ich habe mal Deinen Code insofern umgeàndert, als ich AutoResetEvent
verwendet habe, wegen Nachvollziehbarkeit (und wegen meiner Ungeduld ;-)
- siehe am Ende.

Löscht man die 2 eingefügten Zeilen und führt den Code noch
einmal aus bekommt man auf jeden Fall diese Exception:
"System.Transactions.TransactionAbortedException: Die Transaktion wurde
abgebrochen. > System.Transactions.TransactionPromotionException: Fehler
beim Versuch, die Transaktion heraufzustufen. >



Nun zum eigentlichen Problem:
In dem Moment, wo Du zwei Verbindungen verwendest, muß die Transaktion
in eine verteilte Transaktion umgewandelt werden.
Das funktioniert aber nicht, wenn bereits eine Verbindung Daten abruft -
also einen DataReader geöffnet hat. Und es sollte verstàndlich werden,
wenn man über die Konsequenzen nachdenkt: Es ist dann unmöglich, die
Transaktionssicherheit herzustellen (ein Teil mag gelesen sein, ein
anderer nicht).

Eine Alternative (unten im ConnectionString auskommentiert), ist das
verwenden von MARS (ab SQL Server 2005).
Dabei wird der bereits eröffnete Reader außen vorgelassen und die
SqlConnection teilt sich intern quasi auf.
Ich würde aber davon abraten, da das Konsequenzen hat, denn dabei kann
es u. U. zur Blockierung/Deadlocks und anderen Seiteneffekten kommen.
Und das làsst sich am Ende noch schwerer nachvollziehen.

Alternativ habe ich auch diesen Code hier versucht der eher dem enspricht
was wir Momentan verwenden. Allerdings liefert der wiederum folgende
Exception (fast immer oder eben die obere)

"System.Data.SqlClient.SqlException: Distributed transaction completed.
Either enlist this session in a new transaction or the NULL transaction.
bei System.Data.SqlClient.SqlConnection.OnError(SqlException exception,
Boolean breakConnection)



Da ist die àußere (verteilte) Transaktion abgeschlossen.

Von (statischen) Connection Objekten die über mehrere Threads
verwendet werden, würde ich in einer Multithreaded Umgebung grundsàtzlich
abraten. Da verlasse Dich besser auf das ConnectionPooling
und gib die Connection im jeweiligen Worker sobald als möglich
frei (using...).

Gibt es überhaupt eine vernünftige möglichkeit Multithreaded mit Datenbanken
zu arbeiten oder sollte man das lieber ganz lassen?



Gibt es schon.
Nur mußt Du Dir sehr bewußt sein, wie Transaktionen funktionieren.
Denn mehrere Threads können Probleme verschàrfen, die man mit
Transaktionen schon hat, wenn man "nur" mehrere Prozesse auf
eine Datenbank loslàßt.
Wenn Du das nicht sehr gut planst, handelst Du Dir am Ende
Blockierungen und Deadlocks ein.

Gruß Elmar

using System;
using System.Collections.Generic;
using System.Text;

using System.Data;
using System.Data.SqlClient;
using System.Transactions;
using System.Threading;

namespace CS2008
{
internal static class TransactionPromoting
{
internal static void Execute()
{
SetupTable(6);
Test1.Execute();
}

#region Helper

internal static string SqlConnectionString
{
[System.Diagnostics.DebuggerStepThrough]
get
{
return Properties.Settings.Default.TempDbConnectionString;
//+ ";Enlist=True"
//+ ";MultipleActiveResultSets=true";
}
}

internal static void SetupTable(int rows)
{
using (SqlConnection connection = new SqlConnection(SqlConnectionString))
{
connection.Open();

using (SqlCommand createCommand = new SqlCommand(null, connection))
{
createCommand.CommandText = "IF OBJECT_ID(N'dbo.Test', 'U') IS NOT NULL DROP TABLE dbo.Test";
createCommand.ExecuteNonQuery();

createCommand.CommandText = "CREATE TABLE dbo.Test( "
+ "id bigint NOT NULL PRIMARY KEY, "
+ "daten varchar(20))";
createCommand.ExecuteNonQuery();
}

using (SqlCommand insertCommand = new SqlCommand(
"INSERT INTO dbo.Test(id, daten) VALUES(@id, @daten)",
connection))
{
insertCommand.Parameters.Add(new SqlParameter("@id", SqlDbType.BigInt));
insertCommand.Parameters.Add(new SqlParameter("@daten", SqlDbType.VarChar));

for (int id = 1; id <= rows; id++)
{
insertCommand.Parameters["@id"].Value = id;
insertCommand.Parameters["@daten"].Value = String.Format(null, "Zeile {0}", id);
insertCommand.ExecuteNonQuery();
}
}
connection.Close();
}
}
#endregion Helper

#region Test1
internal static class Test1
{
private static AutoResetEvent worker1ReaderEvent = new AutoResetEvent(false);

// Alternativ Test
private static AutoResetEvent worker1Event = new AutoResetEvent(false);
private static AutoResetEvent worker2Event = new AutoResetEvent(false);

internal static void Execute()
{
try
{
using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))
{
Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Test1.Worker1);
Thread worker2 = new Thread(Test1.Worker2);
worker1.IsBackground = true;
worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));
worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));
//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));
//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);
Console.WriteLine("About to complete the main thread");
ts.Complete();
}
Console.WriteLine("Transaction Completed");
}
catch (Exception ex)
{
Console.WriteLine("Top Catch");
Console.WriteLine(ex.ToString());
}
}

#region Worker1
private static void Worker1(object ar)
{
try
{
DependentTransaction dtx = (DependentTransaction)ar;
using (TransactionScope ts = new TransactionScope(dtx))
{
Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
worker1Event.Set();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);
co.ExecuteNonQuery();

// Aktivieren um vorher keinen Reader zu erzeugen...
//worker1Event.Set();
//worker2Event.WaitOne();

co = new SqlCommand("SELECT * FROM TEST", conn);
{
SqlDataReader r = co.ExecuteReader();

// Für Test Reader Open
worker1ReaderEvent.Set();
worker2Event.WaitOne();

while (r.Read())
{
Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));
//System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));
}
r.Close();
}
}

//Thread.Sleep(9000);
Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker1 thread's transaction scope");
ts.Complete();
}
Console.WriteLine("Completing the dependent clone");
dtx.Complete();
}
catch (Exception ex)
{
Console.WriteLine("Worker1 Catch");
Console.WriteLine(ex.ToString());
throw;
}
}
#endregion Worker1

#region Worker2
private static void Worker2(object ar)
{
try
{
DependentTransaction dtx = (DependentTransaction)ar;
using (TransactionScope ts = new TransactionScope(dtx))
{
Console.WriteLine("Check Transaction (Worker2Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
// Wenn aktiviert: Promoting schlàgt fehl
worker1ReaderEvent.WaitOne();

conn.Open();
worker2Event.Set();
//Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);
co.ExecuteNonQuery();

// Aktivieren um vorher keinen Reader zu erzeugen...
//worker2Event.Set();
//worker1Event.WaitOne();

co = new SqlCommand("SELECT * FROM TEST", conn);
{
SqlDataReader r = co.ExecuteReader();
while (r.Read())
{
Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));
//System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));
}
r.Close();
}
}
//Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction scope");
ts.Complete();
}
Console.WriteLine("Completing the dependent clone");
dtx.Complete();
}
catch (Exception ex)
{
Console.WriteLine("Worker2 Catch");
Console.WriteLine(ex.ToString());
throw;
}
}
#endregion Worker2
}
#endregion Test1
}
}

Ähnliche fragen