Multithreaded Datenbankzugriff in C# und SQL2005 Server mit TransactionScope (Bug oder Fehler meinerseits?)

28/04/2008 - 17:21 von Michael Schöller | Report spam
Hallo Leute,
Ich hab hier ein verzwicktes Problem. Eigentlich will ich nur mehr wissen
warum sich .NET3.0 und vermutlich auch alle anderen .NET Versionen so
verhalten und ob das ev. ein Bug sein könnte.
Gleich vorweg der MSDTC ist konfiguriert und funktioniert.

Auf einem SQL2005-Server liegt eine Tabelle Test (bigint, varchar(20)). Die
ist befüllt mit den Werten
1, 'Test1'
2, 'Test2'
3, 'Test3'
4, 'Test4'
5, 'Test5'
6, 'Test6'

Führt man nachfolgenden Code aus Funktioniert das beim 1 vom 20 Mal
einwandfrei. Löscht man die 2 eingefügten Zeilen und führt den Code noch
einmal aus bekommt man auf jeden Fall diese Exception:
"System.Transactions.TransactionAbortedException: Die Transaktion wurde
abgebrochen. > System.Transactions.TransactionPromotionException: Fehler
beim Versuch, die Transaktion heraufzustufen. >
System.Data.SqlClient.SqlException: Diesem Befehl ist bereits ein geöffneter
DataReader zugeordnet, der zuerst geschlossen werden muss. bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransactionYukon(TransactionRequest
transactionRequest, String transactionName, IsolationLevel iso,
SqlInternalTransaction internalTransaction, Boolean
isDelegateControlRequest) bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransaction(TransactionRequest
transactionRequest, String name, IsolationLevel iso, SqlInternalTransaction
internalTransaction, Boolean isDelegateControlRequest) bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote() Ende der
internen Ausnahmestapelüberwachung bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote() bei
System.Transactions.Tr

ansactionStatePSPEOperation.PSPEPromote(InternalTransaction tx) bei
System.Transactions.TransactionStateDelegatedBase.EnterState(InternalTransaction
tx) Ende der internen Ausnahmestapelüberwachung bei
System.Transactions.TransactionStateAborted.CheckForFinishedTransaction(InternalTransaction
tx) bei
System.Transactions.TransactionStatePhase0.Promote(InternalTransaction
tx) bei System.Transactions.Transaction.Promote() bei
System.Transactions.TransactionInterop.ConvertToOletxTransaction(Transaction
transaction) bei
System.Transactions.TransactionInterop.GetExportCookie(Transaction
transaction, Byte[] whereabouts) bei
System.Data.SqlClient.SqlInternalConnection.EnlistNonNull(Transaction
tx) bei System.Data.SqlClient.SqlInternalConnection.Enlist(Transaction
tx) bei
System.Data.SqlClient.SqlInternalConnectionTds.Activate(Transaction
transaction) bei
System.Data.ProviderBase.DbConnectionInternal.ActivateConnection(Transaction
transaction)

bei System.Data.ProviderBase.DbConnectionPool.GetConnection(DbConnection
owningObject) bei
System.Data.ProviderBase.DbConnectionFactory.GetConnection(DbConnection
owningConnection) bei
System.Data.ProviderBase.DbConnectionClosed.OpenConnection(DbConnection
outerConnection, DbConnectionFactory connectionFactory) bei
System.Data.SqlClient.SqlConnection.Open() bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
Studio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
120. bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state) bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state) bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

nur kann ich das mit dem Datareader nicht recht glauben... hier der Code.

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static void Main(string[] args)

{

try

{

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));


//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



Alternativ habe ich auch diesen Code hier versucht der eher dem enspricht
was wir Momentan verwenden. Allerdings liefert der wiederum folgende
Exception (fast immer oder eben die obere)

"System.Data.SqlClient.SqlException: Distributed transaction completed.
Either enlist this session in a new transaction or the NULL transaction.
bei System.Data.SqlClient.SqlConnection.OnError(SqlException exception,
Boolean breakConnection) bei
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception,
Boolean breakConnection) bei
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject
stateObj) bei System.Data.SqlClient.TdsParser.Run(RunBehavior
runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream,
BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)
bei System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String
methodName, Boolean async) bei
System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult
result, String methodName, Boolean sendToPipe) bei
System.Data.SqlClient.SqlCommand.ExecuteNonQuery() bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
St

udio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
133. bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state) bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state) bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

Hier der Code

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static SqlConnection conn1;

static SqlConnection conn2;

static void Main(string[] args)

{

try

{

conn1 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn2 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn1.Open();

conn2.Open();

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

conn1.EnlistTransaction(Transaction.Current);

conn2.EnlistTransaction(Transaction.Current);

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));


//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

conn1.Close();

conn2.Close();

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn1;

{

//conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn2;

{

//conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



Für mich ergibt das aber recht wenig Sinn. Es funktioniert rein gar nichts
so wie ich mir das vorstellen würde.

Gibt es überhaupt eine vernünftige möglichkeit Multithreaded mit Datenbanken
zu arbeiten oder sollte man das lieber ganz lassen?

Wenn ja würde ich um ein Funktionierendes Beispiel bitten. Mir gehen die
Gründe aus warum das nicht geht. Vor allem sehe ich nicht ein warum ich
nicht Multithreadet auf die Datenbank losgehen können soll.



Mit der Bitte um Hilfe

Michael Schöller
 

Lesen sie die antworten

#1 Thomas Scheidegger
28/04/2008 - 21:06 | Warnen spam
Hallo Michael

zustàndig scheint eher:
microsoft.public.de.german.entwickler.dotnet.datenbank


DependentTransaction dtx = (DependentTransaction)ar;
using (TransactionScope ts = new TransactionScope(dtx))
Gibt es überhaupt eine vernünftige möglichkeit Multithreaded mit
Datenbanken zu arbeiten oder sollte man das lieber ganz lassen?




àhnlicher Fall
<URL:http://groups.google.com/group/micr...7ab12/>

= single threaded.



Thomas Scheidegger - 'NETMaster'
http://www.cetus-links.org/oo_dotnet.html - http://dnetmaster.net/

Ähnliche fragen